summaryrefslogtreecommitdiff
path: root/cloudinit/net
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/net')
-rw-r--r--cloudinit/net/__init__.py579
-rw-r--r--cloudinit/net/activators.py87
-rw-r--r--cloudinit/net/bsd.py112
-rwxr-xr-xcloudinit/net/cmdline.py97
-rw-r--r--cloudinit/net/dhcp.py194
-rw-r--r--cloudinit/net/eni.py454
-rw-r--r--cloudinit/net/freebsd.py44
-rw-r--r--cloudinit/net/netbsd.py27
-rw-r--r--cloudinit/net/netplan.py313
-rw-r--r--cloudinit/net/network_state.py734
-rw-r--r--cloudinit/net/networkd.py208
-rw-r--r--cloudinit/net/openbsd.py33
-rw-r--r--cloudinit/net/renderer.py31
-rw-r--r--cloudinit/net/renderers.py40
-rw-r--r--cloudinit/net/sysconfig.py886
-rw-r--r--cloudinit/net/udev.py23
16 files changed, 2182 insertions, 1680 deletions
diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py
index f81f3a7b..1a738dbc 100644
--- a/cloudinit/net/__init__.py
+++ b/cloudinit/net/__init__.py
@@ -13,14 +13,13 @@ import os
import re
from typing import Any, Dict
-from cloudinit import subp
-from cloudinit import util
+from cloudinit import subp, util
from cloudinit.net.network_state import mask_to_net_prefix
from cloudinit.url_helper import UrlError, readurl
LOG = logging.getLogger(__name__)
SYS_CLASS_NET = "/sys/class/net/"
-DEFAULT_PRIMARY_INTERFACE = 'eth0'
+DEFAULT_PRIMARY_INTERFACE = "eth0"
OVS_INTERNAL_INTERFACE_LOOKUP_CMD = [
"ovs-vsctl",
"--format",
@@ -36,15 +35,17 @@ OVS_INTERNAL_INTERFACE_LOOKUP_CMD = [
]
-def natural_sort_key(s, _nsre=re.compile('([0-9]+)')):
+def natural_sort_key(s, _nsre=re.compile("([0-9]+)")):
"""Sorting for Humans: natural sort order. Can be use as the key to sort
functions.
This will sort ['eth0', 'ens3', 'ens10', 'ens12', 'ens8', 'ens0'] as
['ens0', 'ens3', 'ens8', 'ens10', 'ens12', 'eth0'] instead of the simple
python way which will produce ['ens0', 'ens10', 'ens12', 'ens3', 'ens8',
'eth0']."""
- return [int(text) if text.isdigit() else text.lower()
- for text in re.split(_nsre, s)]
+ return [
+ int(text) if text.isdigit() else text.lower()
+ for text in re.split(_nsre, s)
+ ]
def get_sys_class_path():
@@ -56,14 +57,19 @@ def sys_dev_path(devname, path=""):
return get_sys_class_path() + devname + "/" + path
-def read_sys_net(devname, path, translate=None,
- on_enoent=None, on_keyerror=None,
- on_einval=None):
+def read_sys_net(
+ devname,
+ path,
+ translate=None,
+ on_enoent=None,
+ on_keyerror=None,
+ on_einval=None,
+):
dev_path = sys_dev_path(devname, path)
try:
contents = util.load_file(dev_path)
except (OSError, IOError) as e:
- e_errno = getattr(e, 'errno', None)
+ e_errno = getattr(e, "errno", None)
if e_errno in (errno.ENOENT, errno.ENOTDIR):
if on_enoent is not None:
return on_enoent(e)
@@ -80,19 +86,26 @@ def read_sys_net(devname, path, translate=None,
if on_keyerror is not None:
return on_keyerror(e)
else:
- LOG.debug("Found unexpected (not translatable) value"
- " '%s' in '%s", contents, dev_path)
+ LOG.debug(
+ "Found unexpected (not translatable) value '%s' in '%s",
+ contents,
+ dev_path,
+ )
raise
def read_sys_net_safe(iface, field, translate=None):
def on_excp_false(e):
return False
- return read_sys_net(iface, field,
- on_keyerror=on_excp_false,
- on_enoent=on_excp_false,
- on_einval=on_excp_false,
- translate=translate)
+
+ return read_sys_net(
+ iface,
+ field,
+ on_keyerror=on_excp_false,
+ on_enoent=on_excp_false,
+ on_einval=on_excp_false,
+ translate=translate,
+ )
def read_sys_net_int(iface, field):
@@ -109,7 +122,7 @@ def is_up(devname):
# The linux kernel says to consider devices in 'unknown'
# operstate as up for the purposes of network configuration. See
# Documentation/networking/operstates.txt in the kernel source.
- translate = {'up': True, 'unknown': True, 'down': False}
+ translate = {"up": True, "unknown": True, "down": False}
return read_sys_net_safe(devname, "operstate", translate=translate)
@@ -136,7 +149,7 @@ def master_is_bridge_or_bond(devname):
return False
bonding_path = os.path.join(master_path, "bonding")
bridge_path = os.path.join(master_path, "bridge")
- return (os.path.exists(bonding_path) or os.path.exists(bridge_path))
+ return os.path.exists(bonding_path) or os.path.exists(bridge_path)
def master_is_openvswitch(devname):
@@ -195,23 +208,24 @@ def is_openvswitch_internal_interface(devname: str) -> bool:
def is_netfailover(devname, driver=None):
- """ netfailover driver uses 3 nics, master, primary and standby.
- this returns True if the device is either the primary or standby
- as these devices are to be ignored.
+ """netfailover driver uses 3 nics, master, primary and standby.
+ this returns True if the device is either the primary or standby
+ as these devices are to be ignored.
"""
if driver is None:
driver = device_driver(devname)
- if is_netfail_primary(devname, driver) or is_netfail_standby(devname,
- driver):
+ if is_netfail_primary(devname, driver) or is_netfail_standby(
+ devname, driver
+ ):
return True
return False
def get_dev_features(devname):
- """ Returns a str from reading /sys/class/net/<devname>/device/features."""
- features = ''
+ """Returns a str from reading /sys/class/net/<devname>/device/features."""
+ features = ""
try:
- features = read_sys_net(devname, 'device/features')
+ features = read_sys_net(devname, "device/features")
except Exception:
pass
return features
@@ -231,13 +245,13 @@ def has_netfail_standby_feature(devname):
def is_netfail_master(devname, driver=None):
- """ A device is a "netfail master" device if:
+ """A device is a "netfail master" device if:
- - The device does NOT have the 'master' sysfs attribute
- - The device driver is 'virtio_net'
- - The device has the standby feature bit set
+ - The device does NOT have the 'master' sysfs attribute
+ - The device driver is 'virtio_net'
+ - The device has the standby feature bit set
- Return True if all of the above is True.
+ Return True if all of the above is True.
"""
if get_master(devname) is not None:
return False
@@ -255,17 +269,17 @@ def is_netfail_master(devname, driver=None):
def is_netfail_primary(devname, driver=None):
- """ A device is a "netfail primary" device if:
+ """A device is a "netfail primary" device if:
- - the device has a 'master' sysfs file
- - the device driver is not 'virtio_net'
- - the 'master' sysfs file points to device with virtio_net driver
- - the 'master' device has the 'standby' feature bit set
+ - the device has a 'master' sysfs file
+ - the device driver is not 'virtio_net'
+ - the 'master' sysfs file points to device with virtio_net driver
+ - the 'master' device has the 'standby' feature bit set
- Return True if all of the above is True.
+ Return True if all of the above is True.
"""
# /sys/class/net/<devname>/master -> ../../<master devname>
- master_sysfs_path = sys_dev_path(devname, path='master')
+ master_sysfs_path = sys_dev_path(devname, path="master")
if not os.path.exists(master_sysfs_path):
return False
@@ -288,13 +302,13 @@ def is_netfail_primary(devname, driver=None):
def is_netfail_standby(devname, driver=None):
- """ A device is a "netfail standby" device if:
+ """A device is a "netfail standby" device if:
- - The device has a 'master' sysfs attribute
- - The device driver is 'virtio_net'
- - The device has the standby feature bit set
+ - The device has a 'master' sysfs attribute
+ - The device driver is 'virtio_net'
+ - The device has the standby feature bit set
- Return True if all of the above is True.
+ Return True if all of the above is True.
"""
if get_master(devname) is None:
return False
@@ -320,15 +334,15 @@ def is_renamed(devname):
#define NET_NAME_USER 3 /* provided by user-space */
#define NET_NAME_RENAMED 4 /* renamed by user-space */
"""
- name_assign_type = read_sys_net_safe(devname, 'name_assign_type')
- if name_assign_type and name_assign_type in ['3', '4']:
+ name_assign_type = read_sys_net_safe(devname, "name_assign_type")
+ if name_assign_type and name_assign_type in ["3", "4"]:
return True
return False
def is_vlan(devname):
uevent = str(read_sys_net_safe(devname, "uevent"))
- return 'DEVTYPE=vlan' in uevent.splitlines()
+ return "DEVTYPE=vlan" in uevent.splitlines()
def device_driver(devname):
@@ -372,7 +386,7 @@ class ParserError(Exception):
def is_disabled_cfg(cfg):
if not cfg or not isinstance(cfg, dict):
return False
- return cfg.get('config') == "disabled"
+ return cfg.get("config") == "disabled"
def find_fallback_nic(blacklist_drivers=None):
@@ -386,9 +400,9 @@ def find_fallback_nic(blacklist_drivers=None):
def find_fallback_nic_on_netbsd_or_openbsd(blacklist_drivers=None):
- values = list(sorted(
- get_interfaces_by_mac().values(),
- key=natural_sort_key))
+ values = list(
+ sorted(get_interfaces_by_mac().values(), key=natural_sort_key)
+ )
if values:
return values[0]
@@ -402,7 +416,7 @@ def find_fallback_nic_on_freebsd(blacklist_drivers=None):
we'll use the first interface from ``ifconfig -l -u ether``
"""
- stdout, _stderr = subp.subp(['ifconfig', '-l', '-u', 'ether'])
+ stdout, _stderr = subp.subp(["ifconfig", "-l", "-u", "ether"])
values = stdout.split()
if values:
return values[0]
@@ -419,22 +433,31 @@ def find_fallback_nic_on_linux(blacklist_drivers=None):
if not blacklist_drivers:
blacklist_drivers = []
- if 'net.ifnames=0' in util.get_cmdline():
- LOG.debug('Stable ifnames disabled by net.ifnames=0 in /proc/cmdline')
+ if "net.ifnames=0" in util.get_cmdline():
+ LOG.debug("Stable ifnames disabled by net.ifnames=0 in /proc/cmdline")
else:
- unstable = [device for device in get_devicelist()
- if device != 'lo' and not is_renamed(device)]
+ unstable = [
+ device
+ for device in get_devicelist()
+ if device != "lo" and not is_renamed(device)
+ ]
if len(unstable):
- LOG.debug('Found unstable nic names: %s; calling udevadm settle',
- unstable)
- msg = 'Waiting for udev events to settle'
+ LOG.debug(
+ "Found unstable nic names: %s; calling udevadm settle",
+ unstable,
+ )
+ msg = "Waiting for udev events to settle"
util.log_time(LOG.debug, msg, func=util.udevadm_settle)
# get list of interfaces that could have connections
- invalid_interfaces = set(['lo'])
- potential_interfaces = set([device for device in get_devicelist()
- if device_driver(device) not in
- blacklist_drivers])
+ invalid_interfaces = set(["lo"])
+ potential_interfaces = set(
+ [
+ device
+ for device in get_devicelist()
+ if device_driver(device) not in blacklist_drivers
+ ]
+ )
potential_interfaces = potential_interfaces.difference(invalid_interfaces)
# sort into interfaces with carrier, interfaces which could have carrier,
# and ignore interfaces that are definitely disconnected
@@ -452,19 +475,19 @@ def find_fallback_nic_on_linux(blacklist_drivers=None):
if is_netfailover(interface):
# ignore netfailover primary/standby interfaces
continue
- carrier = read_sys_net_int(interface, 'carrier')
+ carrier = read_sys_net_int(interface, "carrier")
if carrier:
connected.append(interface)
continue
# check if nic is dormant or down, as this may make a nick appear to
# not have a carrier even though it could acquire one when brought
# online by dhclient
- dormant = read_sys_net_int(interface, 'dormant')
+ dormant = read_sys_net_int(interface, "dormant")
if dormant:
possibly_connected.append(interface)
continue
- operstate = read_sys_net_safe(interface, 'operstate')
- if operstate in ['dormant', 'down', 'lowerlayerdown', 'unknown']:
+ operstate = read_sys_net_safe(interface, "operstate")
+ if operstate in ["dormant", "down", "lowerlayerdown", "unknown"]:
possibly_connected.append(interface)
continue
@@ -484,7 +507,7 @@ def find_fallback_nic_on_linux(blacklist_drivers=None):
# pick the first that has a mac-address
for name in names:
- if read_sys_net_safe(name, 'address'):
+ if read_sys_net_safe(name, "address"):
return name
return None
@@ -501,32 +524,32 @@ def generate_fallback_config(blacklist_drivers=None, config_driver=None):
# netfail cannot use mac for matching, they have duplicate macs
if is_netfail_master(target_name):
- match = {'name': target_name}
+ match = {"name": target_name}
else:
match = {
- 'macaddress': read_sys_net_safe(target_name, 'address').lower()}
- cfg = {'dhcp4': True, 'set-name': target_name, 'match': match}
+ "macaddress": read_sys_net_safe(target_name, "address").lower()
+ }
+ cfg = {"dhcp4": True, "set-name": target_name, "match": match}
if config_driver:
driver = device_driver(target_name)
if driver:
- cfg['match']['driver'] = driver
- nconf = {'ethernets': {target_name: cfg}, 'version': 2}
+ cfg["match"]["driver"] = driver
+ nconf = {"ethernets": {target_name: cfg}, "version": 2}
return nconf
def extract_physdevs(netcfg):
-
def _version_1(netcfg):
physdevs = []
- for ent in netcfg.get('config', {}):
- if ent.get('type') != 'physical':
+ for ent in netcfg.get("config", {}):
+ if ent.get("type") != "physical":
continue
- mac = ent.get('mac_address')
+ mac = ent.get("mac_address")
if not mac:
continue
- name = ent.get('name')
- driver = ent.get('params', {}).get('driver')
- device_id = ent.get('params', {}).get('device_id')
+ name = ent.get("name")
+ driver = ent.get("params", {}).get("driver")
+ device_id = ent.get("params", {}).get("device_id")
if not driver:
driver = device_driver(name)
if not device_id:
@@ -536,17 +559,17 @@ def extract_physdevs(netcfg):
def _version_2(netcfg):
physdevs = []
- for ent in netcfg.get('ethernets', {}).values():
+ for ent in netcfg.get("ethernets", {}).values():
# only rename if configured to do so
- name = ent.get('set-name')
+ name = ent.get("set-name")
if not name:
continue
# cloud-init requires macaddress for renaming
- mac = ent.get('match', {}).get('macaddress')
+ mac = ent.get("match", {}).get("macaddress")
if not mac:
continue
- driver = ent.get('match', {}).get('driver')
- device_id = ent.get('match', {}).get('device_id')
+ driver = ent.get("match", {}).get("driver")
+ device_id = ent.get("match", {}).get("device_id")
if not driver:
driver = device_driver(name)
if not device_id:
@@ -554,13 +577,13 @@ def extract_physdevs(netcfg):
physdevs.append([mac, name, driver, device_id])
return physdevs
- version = netcfg.get('version')
+ version = netcfg.get("version")
if version == 1:
return _version_1(netcfg)
elif version == 2:
return _version_2(netcfg)
- raise RuntimeError('Unknown network config version: %s' % version)
+ raise RuntimeError("Unknown network config version: %s" % version)
def apply_network_config_names(netcfg, strict_present=True, strict_busy=True):
@@ -577,7 +600,7 @@ def apply_network_config_names(netcfg, strict_present=True, strict_busy=True):
_rename_interfaces(extract_physdevs(netcfg))
except RuntimeError as e:
raise RuntimeError(
- 'Failed to apply network config names: %s' % e
+ "Failed to apply network config names: %s" % e
) from e
@@ -619,33 +642,37 @@ def _get_current_rename_info(check_downable=True):
cur_info = {}
for (name, mac, driver, device_id) in get_interfaces():
cur_info[name] = {
- 'downable': None,
- 'device_id': device_id,
- 'driver': driver,
- 'mac': mac.lower(),
- 'name': name,
- 'up': is_up(name),
+ "downable": None,
+ "device_id": device_id,
+ "driver": driver,
+ "mac": mac.lower(),
+ "name": name,
+ "up": is_up(name),
}
if check_downable:
nmatch = re.compile(r"[0-9]+:\s+(\w+)[@:]")
- ipv6, _err = subp.subp(['ip', '-6', 'addr', 'show', 'permanent',
- 'scope', 'global'], capture=True)
- ipv4, _err = subp.subp(['ip', '-4', 'addr', 'show'], capture=True)
+ ipv6, _err = subp.subp(
+ ["ip", "-6", "addr", "show", "permanent", "scope", "global"],
+ capture=True,
+ )
+ ipv4, _err = subp.subp(["ip", "-4", "addr", "show"], capture=True)
nics_with_addresses = set()
for bytes_out in (ipv6, ipv4):
nics_with_addresses.update(nmatch.findall(bytes_out))
for d in cur_info.values():
- d['downable'] = (d['up'] is False or
- d['name'] not in nics_with_addresses)
+ d["downable"] = (
+ d["up"] is False or d["name"] not in nics_with_addresses
+ )
return cur_info
-def _rename_interfaces(renames, strict_present=True, strict_busy=True,
- current_info=None):
+def _rename_interfaces(
+ renames, strict_present=True, strict_busy=True, current_info=None
+):
if not len(renames):
LOG.debug("no interfaces to rename")
@@ -657,16 +684,15 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
cur_info = {}
for name, data in current_info.items():
cur = data.copy()
- if cur.get('mac'):
- cur['mac'] = cur['mac'].lower()
- cur['name'] = name
+ if cur.get("mac"):
+ cur["mac"] = cur["mac"].lower()
+ cur["name"] = name
cur_info[name] = cur
LOG.debug("Detected interfaces %s", cur_info)
def update_byname(bymac):
- return dict((data['name'], data)
- for data in cur_info.values())
+ return dict((data["name"], data) for data in cur_info.values())
def rename(cur, new):
subp.subp(["ip", "link", "set", cur, "name", new], capture=True)
@@ -687,25 +713,31 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
def entry_match(data, mac, driver, device_id):
"""match if set and in data"""
if mac and driver and device_id:
- return (data['mac'] == mac and
- data['driver'] == driver and
- data['device_id'] == device_id)
+ return (
+ data["mac"] == mac
+ and data["driver"] == driver
+ and data["device_id"] == device_id
+ )
elif mac and driver:
- return (data['mac'] == mac and
- data['driver'] == driver)
+ return data["mac"] == mac and data["driver"] == driver
elif mac:
- return (data['mac'] == mac)
+ return data["mac"] == mac
return False
def find_entry(mac, driver, device_id):
- match = [data for data in cur_info.values()
- if entry_match(data, mac, driver, device_id)]
+ match = [
+ data
+ for data in cur_info.values()
+ if entry_match(data, mac, driver, device_id)
+ ]
if len(match):
if len(match) > 1:
- msg = ('Failed to match a single device. Matched devices "%s"'
- ' with search values "(mac:%s driver:%s device_id:%s)"'
- % (match, mac, driver, device_id))
+ msg = (
+ 'Failed to match a single device. Matched devices "%s"'
+ ' with search values "(mac:%s driver:%s device_id:%s)"'
+ % (match, mac, driver, device_id)
+ )
raise ValueError(msg)
return match[0]
@@ -720,10 +752,11 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
if strict_present:
errors.append(
"[nic not present] Cannot rename mac=%s to %s"
- ", not available." % (mac, new_name))
+ ", not available." % (mac, new_name)
+ )
continue
- cur_name = cur.get('name')
+ cur_name = cur.get("name")
if cur_name == new_name:
# nothing to do
continue
@@ -732,24 +765,25 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
if strict_present:
errors.append(
"[nic not present] Cannot rename mac=%s to %s"
- ", not available." % (mac, new_name))
+ ", not available." % (mac, new_name)
+ )
continue
- if cur['up']:
+ if cur["up"]:
msg = "[busy] Error renaming mac=%s from %s to %s"
- if not cur['downable']:
+ if not cur["downable"]:
if strict_busy:
errors.append(msg % (mac, cur_name, new_name))
continue
- cur['up'] = False
+ cur["up"] = False
cur_ops.append(("down", mac, new_name, (cur_name,)))
ups.append(("up", mac, new_name, (new_name,)))
if new_name in cur_byname:
target = cur_byname[new_name]
- if target['up']:
+ if target["up"]:
msg = "[busy-target] Error renaming mac=%s from %s to %s."
- if not target['downable']:
+ if not target["downable"]:
if strict_busy:
errors.append(msg % (mac, cur_name, new_name))
continue
@@ -762,17 +796,17 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
tmp_name = tmpname_fmt % tmpi
cur_ops.append(("rename", mac, new_name, (new_name, tmp_name)))
- target['name'] = tmp_name
+ target["name"] = tmp_name
cur_byname = update_byname(cur_info)
- if target['up']:
+ if target["up"]:
ups.append(("up", mac, new_name, (tmp_name,)))
- cur_ops.append(("rename", mac, new_name, (cur['name'], new_name)))
- cur['name'] = new_name
+ cur_ops.append(("rename", mac, new_name, (cur["name"], new_name)))
+ cur["name"] = new_name
cur_byname = update_byname(cur_info)
ops += cur_ops
- opmap = {'rename': rename, 'down': down, 'up': up}
+ opmap = {"rename": rename, "down": down, "up": up}
if len(ops) + len(ups) == 0:
if len(errors):
@@ -787,11 +821,12 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
opmap.get(op)(*params)
except Exception as e:
errors.append(
- "[unknown] Error performing %s%s for %s, %s: %s" %
- (op, params, mac, new_name, e))
+ "[unknown] Error performing %s%s for %s, %s: %s"
+ % (op, params, mac, new_name, e)
+ )
if len(errors):
- raise Exception('\n'.join(errors))
+ raise Exception("\n".join(errors))
def get_interface_mac(ifname):
@@ -810,7 +845,7 @@ def get_ib_interface_hwaddr(ifname, ethernet_format):
representation of the address will be returned.
"""
# Type 32 is Infiniband.
- if read_sys_net_safe(ifname, 'type') == '32':
+ if read_sys_net_safe(ifname, "type") == "32":
mac = get_interface_mac(ifname)
if mac and ethernet_format:
# Use bytes 13-15 and 18-20 of the hardware address.
@@ -821,26 +856,30 @@ def get_ib_interface_hwaddr(ifname, ethernet_format):
def get_interfaces_by_mac(blacklist_drivers=None) -> dict:
if util.is_FreeBSD() or util.is_DragonFlyBSD():
return get_interfaces_by_mac_on_freebsd(
- blacklist_drivers=blacklist_drivers)
+ blacklist_drivers=blacklist_drivers
+ )
elif util.is_NetBSD():
return get_interfaces_by_mac_on_netbsd(
- blacklist_drivers=blacklist_drivers)
+ blacklist_drivers=blacklist_drivers
+ )
elif util.is_OpenBSD():
return get_interfaces_by_mac_on_openbsd(
- blacklist_drivers=blacklist_drivers)
+ blacklist_drivers=blacklist_drivers
+ )
else:
return get_interfaces_by_mac_on_linux(
- blacklist_drivers=blacklist_drivers)
+ blacklist_drivers=blacklist_drivers
+ )
def get_interfaces_by_mac_on_freebsd(blacklist_drivers=None) -> dict():
- (out, _) = subp.subp(['ifconfig', '-a', 'ether'])
+ (out, _) = subp.subp(["ifconfig", "-a", "ether"])
# flatten each interface block in a single line
def flatten(out):
- curr_block = ''
- for line in out.split('\n'):
- if line.startswith('\t'):
+ curr_block = ""
+ for line in out.split("\n"):
+ if line.startswith("\t"):
curr_block += line
else:
if curr_block:
@@ -852,10 +891,11 @@ def get_interfaces_by_mac_on_freebsd(blacklist_drivers=None) -> dict():
def find_mac(flat_list):
for block in flat_list:
m = re.search(
- r"^(?P<ifname>\S*): .*ether\s(?P<mac>[\da-f:]{17}).*",
- block)
+ r"^(?P<ifname>\S*): .*ether\s(?P<mac>[\da-f:]{17}).*", block
+ )
if m:
- yield (m.group('mac'), m.group('ifname'))
+ yield (m.group("mac"), m.group("ifname"))
+
results = {mac: ifname for mac, ifname in find_mac(flatten(out))}
return results
@@ -866,13 +906,13 @@ def get_interfaces_by_mac_on_netbsd(blacklist_drivers=None) -> dict():
r"(?P<ifname>\w+).*address:\s"
r"(?P<mac>([\da-f]{2}[:-]){5}([\da-f]{2})).*"
)
- (out, _) = subp.subp(['ifconfig', '-a'])
- if_lines = re.sub(r'\n\s+', ' ', out).splitlines()
+ (out, _) = subp.subp(["ifconfig", "-a"])
+ if_lines = re.sub(r"\n\s+", " ", out).splitlines()
for line in if_lines:
m = re.match(re_field_match, line)
if m:
fields = m.groupdict()
- ret[fields['mac']] = fields['ifname']
+ ret[fields["mac"]] = fields["ifname"]
return ret
@@ -880,14 +920,15 @@ def get_interfaces_by_mac_on_openbsd(blacklist_drivers=None) -> dict():
ret = {}
re_field_match = (
r"(?P<ifname>\w+).*lladdr\s"
- r"(?P<mac>([\da-f]{2}[:-]){5}([\da-f]{2})).*")
- (out, _) = subp.subp(['ifconfig', '-a'])
- if_lines = re.sub(r'\n\s+', ' ', out).splitlines()
+ r"(?P<mac>([\da-f]{2}[:-]){5}([\da-f]{2})).*"
+ )
+ (out, _) = subp.subp(["ifconfig", "-a"])
+ if_lines = re.sub(r"\n\s+", " ", out).splitlines()
for line in if_lines:
m = re.match(re_field_match, line)
if m:
fields = m.groupdict()
- ret[fields['mac']] = fields['ifname']
+ ret[fields["mac"]] = fields["ifname"]
return ret
@@ -897,11 +938,13 @@ def get_interfaces_by_mac_on_linux(blacklist_drivers=None) -> dict:
Bridges and any devices that have a 'stolen' mac are excluded."""
ret = {}
for name, mac, _driver, _devid in get_interfaces(
- blacklist_drivers=blacklist_drivers):
+ blacklist_drivers=blacklist_drivers
+ ):
if mac in ret:
raise RuntimeError(
- "duplicate mac found! both '%s' and '%s' have mac '%s'" %
- (name, ret[mac], mac))
+ "duplicate mac found! both '%s' and '%s' have mac '%s'"
+ % (name, ret[mac], mac)
+ )
ret[mac] = name
# Try to get an Infiniband hardware address (in 6 byte Ethernet format)
# for the interface.
@@ -909,8 +952,9 @@ def get_interfaces_by_mac_on_linux(blacklist_drivers=None) -> dict:
if ib_mac:
if ib_mac in ret:
raise RuntimeError(
- "duplicate mac found! both '%s' and '%s' have mac '%s'" %
- (name, ret[ib_mac], ib_mac))
+ "duplicate mac found! both '%s' and '%s' have mac '%s'"
+ % (name, ret[ib_mac], ib_mac)
+ )
ret[ib_mac] = name
return ret
@@ -924,7 +968,7 @@ def get_interfaces(blacklist_drivers=None) -> list:
blacklist_drivers = []
devs = get_devicelist()
# 16 somewhat arbitrarily chosen. Normally a mac is 6 '00:' tokens.
- zero_mac = ':'.join(('00',) * 16)
+ zero_mac = ":".join(("00",) * 16)
for name in devs:
if not interface_has_own_mac(name):
continue
@@ -935,8 +979,9 @@ def get_interfaces(blacklist_drivers=None) -> list:
if is_bond(name):
continue
if get_master(name) is not None:
- if (not master_is_bridge_or_bond(name) and
- not master_is_openvswitch(name)):
+ if not master_is_bridge_or_bond(
+ name
+ ) and not master_is_openvswitch(name):
continue
if is_netfailover(name):
continue
@@ -945,7 +990,7 @@ def get_interfaces(blacklist_drivers=None) -> list:
if not mac:
continue
# skip nics that have no mac (00:00....)
- if name != 'lo' and mac == zero_mac[:len(mac)]:
+ if name != "lo" and mac == zero_mac[: len(mac)]:
continue
if is_openvswitch_internal_interface(name):
continue
@@ -966,8 +1011,9 @@ def get_ib_hwaddrs_by_interface():
if ib_mac:
if ib_mac in ret:
raise RuntimeError(
- "duplicate mac found! both '%s' and '%s' have mac '%s'" %
- (name, ret[ib_mac], ib_mac))
+ "duplicate mac found! both '%s' and '%s' have mac '%s'"
+ % (name, ret[ib_mac], ib_mac)
+ )
ret[name] = ib_mac
return ret
@@ -985,18 +1031,21 @@ def has_url_connectivity(url_data: Dict[str, Any]) -> bool:
"timeout": 10
})
"""
- if 'url' not in url_data:
+ if "url" not in url_data:
LOG.warning(
- "Ignoring connectivity check. No 'url' to check in %s", url_data)
+ "Ignoring connectivity check. No 'url' to check in %s", url_data
+ )
return False
- url = url_data['url']
- if not any([url.startswith('http://'), url.startswith('https://')]):
+ url = url_data["url"]
+ if not any([url.startswith("http://"), url.startswith("https://")]):
LOG.warning(
"Ignoring connectivity check. Expected URL beginning with http*://"
- " received '%s'", url)
+ " received '%s'",
+ url,
+ )
return False
- if 'timeout' not in url_data:
- url_data['timeout'] = 5
+ if "timeout" not in url_data:
+ url_data["timeout"] = 5
try:
readurl(**url_data)
except UrlError:
@@ -1047,9 +1096,16 @@ class EphemeralIPv4Network(object):
context exit, clean up the interface leaving no configuration behind.
"""
- def __init__(self, interface, ip, prefix_or_mask, broadcast, router=None,
- connectivity_url_data: Dict[str, Any] = None,
- static_routes=None):
+ def __init__(
+ self,
+ interface,
+ ip,
+ prefix_or_mask,
+ broadcast,
+ router=None,
+ connectivity_url_data: Dict[str, Any] = None,
+ static_routes=None,
+ ):
"""Setup context manager and validate call signature.
@param interface: Name of the network interface to bring up.
@@ -1064,14 +1120,14 @@ class EphemeralIPv4Network(object):
"""
if not all([interface, ip, prefix_or_mask, broadcast]):
raise ValueError(
- 'Cannot init network on {0} with {1}/{2} and bcast {3}'.format(
- interface, ip, prefix_or_mask, broadcast))
+ "Cannot init network on {0} with {1}/{2} and bcast {3}".format(
+ interface, ip, prefix_or_mask, broadcast
+ )
+ )
try:
self.prefix = mask_to_net_prefix(prefix_or_mask)
except ValueError as e:
- raise ValueError(
- 'Cannot setup network: {0}'.format(e)
- ) from e
+ raise ValueError("Cannot setup network: {0}".format(e)) from e
self.connectivity_url_data = connectivity_url_data
self.interface = interface
@@ -1086,8 +1142,10 @@ class EphemeralIPv4Network(object):
if self.connectivity_url_data:
if has_url_connectivity(self.connectivity_url_data):
LOG.debug(
- 'Skip ephemeral network setup, instance has connectivity'
- ' to %s', self.connectivity_url_data['url'])
+ "Skip ephemeral network setup, instance has connectivity"
+ " to %s",
+ self.connectivity_url_data["url"],
+ )
return
self._bringup_device()
@@ -1116,38 +1174,92 @@ class EphemeralIPv4Network(object):
def _delete_address(self, address, prefix):
"""Perform the ip command to remove the specified address."""
subp.subp(
- ['ip', '-family', 'inet', 'addr', 'del',
- '%s/%s' % (address, prefix), 'dev', self.interface],
- capture=True)
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "del",
+ "%s/%s" % (address, prefix),
+ "dev",
+ self.interface,
+ ],
+ capture=True,
+ )
def _bringup_device(self):
"""Perform the ip comands to fully setup the device."""
- cidr = '{0}/{1}'.format(self.ip, self.prefix)
+ cidr = "{0}/{1}".format(self.ip, self.prefix)
LOG.debug(
- 'Attempting setup of ephemeral network on %s with %s brd %s',
- self.interface, cidr, self.broadcast)
+ "Attempting setup of ephemeral network on %s with %s brd %s",
+ self.interface,
+ cidr,
+ self.broadcast,
+ )
try:
subp.subp(
- ['ip', '-family', 'inet', 'addr', 'add', cidr, 'broadcast',
- self.broadcast, 'dev', self.interface],
- capture=True, update_env={'LANG': 'C'})
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ cidr,
+ "broadcast",
+ self.broadcast,
+ "dev",
+ self.interface,
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ )
except subp.ProcessExecutionError as e:
if "File exists" not in e.stderr:
raise
LOG.debug(
- 'Skip ephemeral network setup, %s already has address %s',
- self.interface, self.ip)
+ "Skip ephemeral network setup, %s already has address %s",
+ self.interface,
+ self.ip,
+ )
else:
# Address creation success, bring up device and queue cleanup
subp.subp(
- ['ip', '-family', 'inet', 'link', 'set', 'dev', self.interface,
- 'up'], capture=True)
+ [
+ "ip",
+ "-family",
+ "inet",
+ "link",
+ "set",
+ "dev",
+ self.interface,
+ "up",
+ ],
+ capture=True,
+ )
self.cleanup_cmds.append(
- ['ip', '-family', 'inet', 'link', 'set', 'dev', self.interface,
- 'down'])
+ [
+ "ip",
+ "-family",
+ "inet",
+ "link",
+ "set",
+ "dev",
+ self.interface,
+ "down",
+ ]
+ )
self.cleanup_cmds.append(
- ['ip', '-family', 'inet', 'addr', 'del', cidr, 'dev',
- self.interface])
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "del",
+ cidr,
+ "dev",
+ self.interface,
+ ]
+ )
def _bringup_static_routes(self):
# static_routes = [("169.254.169.254/32", "130.56.248.255"),
@@ -1155,35 +1267,76 @@ class EphemeralIPv4Network(object):
for net_address, gateway in self.static_routes:
via_arg = []
if gateway != "0.0.0.0":
- via_arg = ['via', gateway]
+ via_arg = ["via", gateway]
subp.subp(
- ['ip', '-4', 'route', 'append', net_address] + via_arg +
- ['dev', self.interface], capture=True)
+ ["ip", "-4", "route", "append", net_address]
+ + via_arg
+ + ["dev", self.interface],
+ capture=True,
+ )
self.cleanup_cmds.insert(
- 0, ['ip', '-4', 'route', 'del', net_address] + via_arg +
- ['dev', self.interface])
+ 0,
+ ["ip", "-4", "route", "del", net_address]
+ + via_arg
+ + ["dev", self.interface],
+ )
def _bringup_router(self):
"""Perform the ip commands to fully setup the router if needed."""
# Check if a default route exists and exit if it does
- out, _ = subp.subp(['ip', 'route', 'show', '0.0.0.0/0'], capture=True)
- if 'default' in out:
+ out, _ = subp.subp(["ip", "route", "show", "0.0.0.0/0"], capture=True)
+ if "default" in out:
LOG.debug(
- 'Skip ephemeral route setup. %s already has default route: %s',
- self.interface, out.strip())
+ "Skip ephemeral route setup. %s already has default route: %s",
+ self.interface,
+ out.strip(),
+ )
return
subp.subp(
- ['ip', '-4', 'route', 'add', self.router, 'dev', self.interface,
- 'src', self.ip], capture=True)
+ [
+ "ip",
+ "-4",
+ "route",
+ "add",
+ self.router,
+ "dev",
+ self.interface,
+ "src",
+ self.ip,
+ ],
+ capture=True,
+ )
self.cleanup_cmds.insert(
0,
- ['ip', '-4', 'route', 'del', self.router, 'dev', self.interface,
- 'src', self.ip])
+ [
+ "ip",
+ "-4",
+ "route",
+ "del",
+ self.router,
+ "dev",
+ self.interface,
+ "src",
+ self.ip,
+ ],
+ )
subp.subp(
- ['ip', '-4', 'route', 'add', 'default', 'via', self.router,
- 'dev', self.interface], capture=True)
+ [
+ "ip",
+ "-4",
+ "route",
+ "add",
+ "default",
+ "via",
+ self.router,
+ "dev",
+ self.interface,
+ ],
+ capture=True,
+ )
self.cleanup_cmds.insert(
- 0, ['ip', '-4', 'route', 'del', 'default', 'dev', self.interface])
+ 0, ["ip", "-4", "route", "del", "default", "dev", self.interface]
+ )
class RendererNotFoundError(RuntimeError):
diff --git a/cloudinit/net/activators.py b/cloudinit/net/activators.py
index 137338d8..e80c26df 100644
--- a/cloudinit/net/activators.py
+++ b/cloudinit/net/activators.py
@@ -4,15 +4,13 @@ import os
from abc import ABC, abstractmethod
from typing import Iterable, List, Type
-from cloudinit import subp
-from cloudinit import util
+from cloudinit import subp, util
from cloudinit.net.eni import available as eni_available
from cloudinit.net.netplan import available as netplan_available
-from cloudinit.net.networkd import available as networkd_available
from cloudinit.net.network_state import NetworkState
+from cloudinit.net.networkd import available as networkd_available
from cloudinit.net.sysconfig import NM_CFG_FILE
-
LOG = logging.getLogger(__name__)
@@ -25,8 +23,7 @@ def _alter_interface(cmd, device_name) -> bool:
try:
(_out, err) = subp.subp(cmd)
if len(err):
- LOG.warning("Running %s resulted in stderr output: %s",
- cmd, err)
+ LOG.warning("Running %s resulted in stderr output: %s", cmd, err)
return True
except subp.ProcessExecutionError:
util.logexc(LOG, "Running interface command %s failed", cmd)
@@ -73,7 +70,7 @@ class NetworkActivator(ABC):
Return True is successful, otherwise return False
"""
return cls.bring_up_interfaces(
- [i['name'] for i in network_state.iter_interfaces()]
+ [i["name"] for i in network_state.iter_interfaces()]
)
@classmethod
@@ -91,7 +88,7 @@ class NetworkActivator(ABC):
Return True is successful, otherwise return False
"""
return cls.bring_down_interfaces(
- [i['name'] for i in network_state.iter_interfaces()]
+ [i["name"] for i in network_state.iter_interfaces()]
)
@@ -111,7 +108,7 @@ class IfUpDownActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- cmd = ['ifup', device_name]
+ cmd = ["ifup", device_name]
return _alter_interface(cmd, device_name)
@staticmethod
@@ -120,18 +117,18 @@ class IfUpDownActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- cmd = ['ifdown', device_name]
+ cmd = ["ifdown", device_name]
return _alter_interface(cmd, device_name)
class NetworkManagerActivator(NetworkActivator):
@staticmethod
def available(target=None) -> bool:
- """ Return true if network manager can be used on this system."""
+ """Return true if network manager can be used on this system."""
config_present = os.path.isfile(
subp.target_path(target, path=NM_CFG_FILE)
)
- nmcli_present = subp.which('nmcli', target=target)
+ nmcli_present = subp.which("nmcli", target=target)
return config_present and bool(nmcli_present)
@staticmethod
@@ -140,7 +137,7 @@ class NetworkManagerActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- cmd = ['nmcli', 'connection', 'up', 'ifname', device_name]
+ cmd = ["nmcli", "connection", "up", "ifname", device_name]
return _alter_interface(cmd, device_name)
@staticmethod
@@ -149,16 +146,16 @@ class NetworkManagerActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- cmd = ['nmcli', 'connection', 'down', device_name]
+ cmd = ["nmcli", "connection", "down", device_name]
return _alter_interface(cmd, device_name)
class NetplanActivator(NetworkActivator):
- NETPLAN_CMD = ['netplan', 'apply']
+ NETPLAN_CMD = ["netplan", "apply"]
@staticmethod
def available(target=None) -> bool:
- """ Return true if netplan can be used on this system."""
+ """Return true if netplan can be used on this system."""
return netplan_available(target=target)
@staticmethod
@@ -167,9 +164,11 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- LOG.debug("Calling 'netplan apply' rather than "
- "altering individual interfaces")
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ LOG.debug(
+ "Calling 'netplan apply' rather than "
+ "altering individual interfaces"
+ )
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
@staticmethod
def bring_up_interfaces(device_names: Iterable[str]) -> bool:
@@ -177,9 +176,11 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- LOG.debug("Calling 'netplan apply' rather than "
- "altering individual interfaces")
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ LOG.debug(
+ "Calling 'netplan apply' rather than "
+ "altering individual interfaces"
+ )
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
@staticmethod
def bring_up_all_interfaces(network_state: NetworkState) -> bool:
@@ -187,7 +188,7 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
@staticmethod
def bring_down_interface(device_name: str) -> bool:
@@ -195,9 +196,11 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- LOG.debug("Calling 'netplan apply' rather than "
- "altering individual interfaces")
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ LOG.debug(
+ "Calling 'netplan apply' rather than "
+ "altering individual interfaces"
+ )
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
@staticmethod
def bring_down_interfaces(device_names: Iterable[str]) -> bool:
@@ -205,9 +208,11 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- LOG.debug("Calling 'netplan apply' rather than "
- "altering individual interfaces")
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ LOG.debug(
+ "Calling 'netplan apply' rather than "
+ "altering individual interfaces"
+ )
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
@staticmethod
def bring_down_all_interfaces(network_state: NetworkState) -> bool:
@@ -215,7 +220,7 @@ class NetplanActivator(NetworkActivator):
Return True is successful, otherwise return False
"""
- return _alter_interface(NetplanActivator.NETPLAN_CMD, 'all')
+ return _alter_interface(NetplanActivator.NETPLAN_CMD, "all")
class NetworkdActivator(NetworkActivator):
@@ -226,20 +231,20 @@ class NetworkdActivator(NetworkActivator):
@staticmethod
def bring_up_interface(device_name: str) -> bool:
- """ Return True is successful, otherwise return False """
- cmd = ['ip', 'link', 'set', 'up', device_name]
+ """Return True is successful, otherwise return False"""
+ cmd = ["ip", "link", "set", "up", device_name]
return _alter_interface(cmd, device_name)
@staticmethod
def bring_up_all_interfaces(network_state: NetworkState) -> bool:
- """ Return True is successful, otherwise return False """
- cmd = ['systemctl', 'restart', 'systemd-networkd', 'systemd-resolved']
- return _alter_interface(cmd, 'all')
+ """Return True is successful, otherwise return False"""
+ cmd = ["systemctl", "restart", "systemd-networkd", "systemd-resolved"]
+ return _alter_interface(cmd, "all")
@staticmethod
def bring_down_interface(device_name: str) -> bool:
- """ Return True is successful, otherwise return False """
- cmd = ['ip', 'link', 'set', 'down', device_name]
+ """Return True is successful, otherwise return False"""
+ cmd = ["ip", "link", "set", "down", device_name]
return _alter_interface(cmd, device_name)
@@ -262,7 +267,8 @@ def search_activator(
unknown = [i for i in priority if i not in DEFAULT_PRIORITY]
if unknown:
raise ValueError(
- "Unknown activators provided in priority list: %s" % unknown)
+ "Unknown activators provided in priority list: %s" % unknown
+ )
return [activator for activator in priority if activator.available(target)]
@@ -277,7 +283,8 @@ def select_activator(priority=None, target=None) -> Type[NetworkActivator]:
tmsg = " in target=%s" % target
raise NoActivatorException(
"No available network activators found%s. Searched "
- "through list: %s" % (tmsg, priority))
+ "through list: %s" % (tmsg, priority)
+ )
selected = found[0]
- LOG.debug('Using selected activator: %s', selected)
+ LOG.debug("Using selected activator: %s", selected)
return selected
diff --git a/cloudinit/net/bsd.py b/cloudinit/net/bsd.py
index 916cea32..dc322582 100644
--- a/cloudinit/net/bsd.py
+++ b/cloudinit/net/bsd.py
@@ -3,11 +3,9 @@
import re
from cloudinit import log as logging
-from cloudinit import net
-from cloudinit import util
-from cloudinit import subp
-from cloudinit.distros.parsers.resolv_conf import ResolvConf
+from cloudinit import net, subp, util
from cloudinit.distros import bsd_utils
+from cloudinit.distros.parsers.resolv_conf import ResolvConf
from . import renderer
@@ -15,8 +13,8 @@ LOG = logging.getLogger(__name__)
class BSDRenderer(renderer.Renderer):
- resolv_conf_fn = 'etc/resolv.conf'
- rc_conf_fn = 'etc/rc.conf'
+ resolv_conf_fn = "etc/resolv.conf"
+ rc_conf_fn = "etc/rc.conf"
def get_rc_config_value(self, key):
fn = subp.target_path(self.target, self.rc_conf_fn)
@@ -31,52 +29,59 @@ class BSDRenderer(renderer.Renderer):
config = {}
self.target = None
self.interface_configurations = {}
- self._postcmds = config.get('postcmds', True)
+ self._postcmds = config.get("postcmds", True)
def _ifconfig_entries(self, settings):
ifname_by_mac = net.get_interfaces_by_mac()
for interface in settings.iter_interfaces():
device_name = interface.get("name")
device_mac = interface.get("mac_address")
- if device_name and re.match(r'^lo\d+$', device_name):
+ if device_name and re.match(r"^lo\d+$", device_name):
continue
if device_mac not in ifname_by_mac:
- LOG.info('Cannot find any device with MAC %s', device_mac)
+ LOG.info("Cannot find any device with MAC %s", device_mac)
elif device_mac and device_name:
cur_name = ifname_by_mac[device_mac]
if cur_name != device_name:
- LOG.info('netif service will rename interface %s to %s',
- cur_name, device_name)
+ LOG.info(
+ "netif service will rename interface %s to %s",
+ cur_name,
+ device_name,
+ )
try:
self.rename_interface(cur_name, device_name)
except NotImplementedError:
- LOG.error((
- 'Interface renaming is '
- 'not supported on this OS'))
+ LOG.error(
+ "Interface renaming is not supported on this OS"
+ )
device_name = cur_name
else:
device_name = ifname_by_mac[device_mac]
- LOG.info('Configuring interface %s', device_name)
+ LOG.info("Configuring interface %s", device_name)
- self.interface_configurations[device_name] = 'DHCP'
+ self.interface_configurations[device_name] = "DHCP"
for subnet in interface.get("subnets", []):
- if subnet.get('type') == 'static':
- if not subnet.get('netmask'):
+ if subnet.get("type") == "static":
+ if not subnet.get("netmask"):
LOG.debug(
- 'Skipping IP %s, because there is no netmask',
- subnet.get('address')
+ "Skipping IP %s, because there is no netmask",
+ subnet.get("address"),
)
continue
- LOG.debug('Configuring dev %s with %s / %s', device_name,
- subnet.get('address'), subnet.get('netmask'))
+ LOG.debug(
+ "Configuring dev %s with %s / %s",
+ device_name,
+ subnet.get("address"),
+ subnet.get("netmask"),
+ )
self.interface_configurations[device_name] = {
- 'address': subnet.get('address'),
- 'netmask': subnet.get('netmask'),
- 'mtu': subnet.get('mtu') or interface.get('mtu'),
+ "address": subnet.get("address"),
+ "netmask": subnet.get("netmask"),
+ "mtu": subnet.get("mtu") or interface.get("mtu"),
}
def _route_entries(self, settings):
@@ -84,22 +89,25 @@ class BSDRenderer(renderer.Renderer):
for interface in settings.iter_interfaces():
subnets = interface.get("subnets", [])
for subnet in subnets:
- if subnet.get('type') != 'static':
+ if subnet.get("type") != "static":
continue
- gateway = subnet.get('gateway')
- if gateway and len(gateway.split('.')) == 4:
- routes.append({
- 'network': '0.0.0.0',
- 'netmask': '0.0.0.0',
- 'gateway': gateway})
- routes += subnet.get('routes', [])
+ gateway = subnet.get("gateway")
+ if gateway and len(gateway.split(".")) == 4:
+ routes.append(
+ {
+ "network": "0.0.0.0",
+ "netmask": "0.0.0.0",
+ "gateway": gateway,
+ }
+ )
+ routes += subnet.get("routes", [])
for route in routes:
- network = route.get('network')
+ network = route.get("network")
if not network:
- LOG.debug('Skipping a bad route entry')
+ LOG.debug("Skipping a bad route entry")
continue
- netmask = route.get('netmask')
- gateway = route.get('gateway')
+ netmask = route.get("netmask")
+ gateway = route.get("gateway")
self.set_route(network, netmask, gateway)
def _resolve_conf(self, settings):
@@ -107,20 +115,26 @@ class BSDRenderer(renderer.Renderer):
searchdomains = settings.dns_searchdomains
for interface in settings.iter_interfaces():
for subnet in interface.get("subnets", []):
- if 'dns_nameservers' in subnet:
- nameservers.extend(subnet['dns_nameservers'])
- if 'dns_search' in subnet:
- searchdomains.extend(subnet['dns_search'])
+ if "dns_nameservers" in subnet:
+ nameservers.extend(subnet["dns_nameservers"])
+ if "dns_search" in subnet:
+ searchdomains.extend(subnet["dns_search"])
# Try to read the /etc/resolv.conf or just start from scratch if that
# fails.
try:
- resolvconf = ResolvConf(util.load_file(subp.target_path(
- self.target, self.resolv_conf_fn)))
+ resolvconf = ResolvConf(
+ util.load_file(
+ subp.target_path(self.target, self.resolv_conf_fn)
+ )
+ )
resolvconf.parse()
except IOError:
- util.logexc(LOG, "Failed to parse %s, use new empty file",
- subp.target_path(self.target, self.resolv_conf_fn))
- resolvconf = ResolvConf('')
+ util.logexc(
+ LOG,
+ "Failed to parse %s, use new empty file",
+ subp.target_path(self.target, self.resolv_conf_fn),
+ )
+ resolvconf = ResolvConf("")
resolvconf.parse()
# Add some nameservers
@@ -138,7 +152,9 @@ class BSDRenderer(renderer.Renderer):
util.logexc(LOG, "Failed to add search domain %s", domain)
util.write_file(
subp.target_path(self.target, self.resolv_conf_fn),
- str(resolvconf), 0o644)
+ str(resolvconf),
+ 0o644,
+ )
def render_network_state(self, network_state, templates=None, target=None):
if target:
@@ -152,7 +168,7 @@ class BSDRenderer(renderer.Renderer):
def dhcp_interfaces(self):
ic = self.interface_configurations.items
- return [k for k, v in ic() if v == 'DHCP']
+ return [k for k, v in ic() if v == "DHCP"]
def start_services(self, run=False):
raise NotImplementedError()
diff --git a/cloudinit/net/cmdline.py b/cloudinit/net/cmdline.py
index 7cdd428d..eab86d9f 100755
--- a/cloudinit/net/cmdline.py
+++ b/cloudinit/net/cmdline.py
@@ -16,8 +16,7 @@ import shlex
from cloudinit import util
-from . import get_devicelist
-from . import read_sys_net_safe
+from . import get_devicelist, read_sys_net_safe
_OPEN_ISCSI_INTERFACE_FILE = "/run/initramfs/open-iscsi.interface"
@@ -58,7 +57,7 @@ class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
if self._mac_addrs is None:
self._mac_addrs = {}
for k in get_devicelist():
- mac_addr = read_sys_net_safe(k, 'address')
+ mac_addr = read_sys_net_safe(k, "address")
if mac_addr:
self._mac_addrs[k] = mac_addr
@@ -74,7 +73,7 @@ class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
"""
if self._files:
for item in shlex.split(self._cmdline):
- if item.startswith('ip=') or item.startswith('ip6='):
+ if item.startswith("ip=") or item.startswith("ip6="):
return True
if os.path.exists(_OPEN_ISCSI_INTERFACE_FILE):
# iBft can configure networking without ip=
@@ -83,7 +82,8 @@ class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
def render_config(self) -> dict:
return config_from_klibc_net_cfg(
- files=self._files, mac_addrs=self._mac_addrs,
+ files=self._files,
+ mac_addrs=self._mac_addrs,
)
@@ -113,78 +113,78 @@ def _klibc_to_config_entry(content, mac_addrs=None):
data = util.load_shell_content(content)
try:
- name = data['DEVICE'] if 'DEVICE' in data else data['DEVICE6']
+ name = data["DEVICE"] if "DEVICE" in data else data["DEVICE6"]
except KeyError as e:
raise ValueError("no 'DEVICE' or 'DEVICE6' entry in data") from e
# ipconfig on precise does not write PROTO
# IPv6 config gives us IPV6PROTO, not PROTO.
- proto = data.get('PROTO', data.get('IPV6PROTO'))
+ proto = data.get("PROTO", data.get("IPV6PROTO"))
if not proto:
- if data.get('filename'):
- proto = 'dhcp'
+ if data.get("filename"):
+ proto = "dhcp"
else:
- proto = 'none'
+ proto = "none"
- if proto not in ('none', 'dhcp', 'dhcp6'):
+ if proto not in ("none", "dhcp", "dhcp6"):
raise ValueError("Unexpected value for PROTO: %s" % proto)
iface = {
- 'type': 'physical',
- 'name': name,
- 'subnets': [],
+ "type": "physical",
+ "name": name,
+ "subnets": [],
}
if name in mac_addrs:
- iface['mac_address'] = mac_addrs[name]
+ iface["mac_address"] = mac_addrs[name]
# Handle both IPv4 and IPv6 values
- for pre in ('IPV4', 'IPV6'):
+ for pre in ("IPV4", "IPV6"):
# if no IPV4ADDR or IPV6ADDR, then go on.
if pre + "ADDR" not in data:
continue
# PROTO for ipv4, IPV6PROTO for ipv6
- cur_proto = data.get(pre + 'PROTO', proto)
+ cur_proto = data.get(pre + "PROTO", proto)
# ipconfig's 'none' is called 'static'
- if cur_proto == 'none':
- cur_proto = 'static'
- subnet = {'type': cur_proto, 'control': 'manual'}
+ if cur_proto == "none":
+ cur_proto = "static"
+ subnet = {"type": cur_proto, "control": "manual"}
# only populate address for static types. While the rendered config
# may have an address for dhcp, that is not really expected.
- if cur_proto == 'static':
- subnet['address'] = data[pre + 'ADDR']
+ if cur_proto == "static":
+ subnet["address"] = data[pre + "ADDR"]
# these fields go right on the subnet
- for key in ('NETMASK', 'BROADCAST', 'GATEWAY'):
+ for key in ("NETMASK", "BROADCAST", "GATEWAY"):
if pre + key in data:
subnet[key.lower()] = data[pre + key]
dns = []
# handle IPV4DNS0 or IPV6DNS0
- for nskey in ('DNS0', 'DNS1'):
+ for nskey in ("DNS0", "DNS1"):
ns = data.get(pre + nskey)
# verify it has something other than 0.0.0.0 (or ipv6)
if ns and len(ns.strip(":.0")):
dns.append(data[pre + nskey])
if dns:
- subnet['dns_nameservers'] = dns
+ subnet["dns_nameservers"] = dns
# add search to both ipv4 and ipv6, as it has no namespace
- search = data.get('DOMAINSEARCH')
+ search = data.get("DOMAINSEARCH")
if search:
- if ',' in search:
- subnet['dns_search'] = search.split(",")
+ if "," in search:
+ subnet["dns_search"] = search.split(",")
else:
- subnet['dns_search'] = search.split()
+ subnet["dns_search"] = search.split()
- iface['subnets'].append(subnet)
+ iface["subnets"].append(subnet)
return name, iface
def _get_klibc_net_cfg_files():
- return glob.glob('/run/net-*.conf') + glob.glob('/run/net6-*.conf')
+ return glob.glob("/run/net-*.conf") + glob.glob("/run/net6-*.conf")
def config_from_klibc_net_cfg(files=None, mac_addrs=None):
@@ -194,24 +194,28 @@ def config_from_klibc_net_cfg(files=None, mac_addrs=None):
entries = []
names = {}
for cfg_file in files:
- name, entry = _klibc_to_config_entry(util.load_file(cfg_file),
- mac_addrs=mac_addrs)
+ name, entry = _klibc_to_config_entry(
+ util.load_file(cfg_file), mac_addrs=mac_addrs
+ )
if name in names:
- prev = names[name]['entry']
- if prev.get('mac_address') != entry.get('mac_address'):
+ prev = names[name]["entry"]
+ if prev.get("mac_address") != entry.get("mac_address"):
raise ValueError(
"device '{name}' was defined multiple times ({files})"
" but had differing mac addresses: {old} -> {new}.".format(
- name=name, files=' '.join(names[name]['files']),
- old=prev.get('mac_address'),
- new=entry.get('mac_address')))
- prev['subnets'].extend(entry['subnets'])
- names[name]['files'].append(cfg_file)
+ name=name,
+ files=" ".join(names[name]["files"]),
+ old=prev.get("mac_address"),
+ new=entry.get("mac_address"),
+ )
+ )
+ prev["subnets"].extend(entry["subnets"])
+ names[name]["files"].append(cfg_file)
else:
- names[name] = {'files': [cfg_file], 'entry': entry}
+ names[name] = {"files": [cfg_file], "entry": entry}
entries.append(entry)
- return {'config': entries, 'version': 1}
+ return {"config": entries, "version": 1}
def read_initramfs_config():
@@ -257,8 +261,10 @@ def _b64dgz(data):
except (TypeError, ValueError):
logging.error(
"Expected base64 encoded kernel commandline parameter"
- " network-config. Ignoring network-config=%s.", data)
- return ''
+ " network-config. Ignoring network-config=%s.",
+ data,
+ )
+ return ""
return _decomp_gzip(blob)
@@ -267,7 +273,7 @@ def read_kernel_cmdline_config(cmdline=None):
if cmdline is None:
cmdline = util.get_cmdline()
- if 'network-config=' in cmdline:
+ if "network-config=" in cmdline:
data64 = None
for tok in cmdline.split():
if tok.startswith("network-config="):
@@ -279,4 +285,5 @@ def read_kernel_cmdline_config(cmdline=None):
return None
+
# vi: ts=4 expandtab
diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py
index 3f4b0418..f9af18cf 100644
--- a/cloudinit/net/dhcp.py
+++ b/cloudinit/net/dhcp.py
@@ -4,26 +4,28 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
-from typing import Dict, Any
-import configobj
import logging
import os
import re
import signal
import time
from io import StringIO
+from typing import Any, Dict
+
+import configobj
+from cloudinit import subp, temp_utils, util
from cloudinit.net import (
- EphemeralIPv4Network, find_fallback_nic, get_devicelist,
- has_url_connectivity)
+ EphemeralIPv4Network,
+ find_fallback_nic,
+ get_devicelist,
+ has_url_connectivity,
+)
from cloudinit.net.network_state import mask_and_ipv4_to_bcast_addr as bcip
-from cloudinit import temp_utils
-from cloudinit import subp
-from cloudinit import util
LOG = logging.getLogger(__name__)
-NETWORKD_LEASES_DIR = '/run/systemd/netif/leases'
+NETWORKD_LEASES_DIR = "/run/systemd/netif/leases"
class InvalidDHCPLeaseFileError(Exception):
@@ -43,7 +45,7 @@ class EphemeralDHCPv4(object):
self,
iface=None,
connectivity_url_data: Dict[str, Any] = None,
- dhcp_log_func=None
+ dhcp_log_func=None,
):
self.iface = iface
self._ephipv4 = None
@@ -57,8 +59,10 @@ class EphemeralDHCPv4(object):
if self.connectivity_url_data:
if has_url_connectivity(self.connectivity_url_data):
LOG.debug(
- 'Skip ephemeral DHCP setup, instance has connectivity'
- ' to %s', self.connectivity_url_data)
+ "Skip ephemeral DHCP setup, instance has connectivity"
+ " to %s",
+ self.connectivity_url_data,
+ )
return
return self.obtain_lease()
@@ -87,31 +91,39 @@ class EphemeralDHCPv4(object):
return self.lease
try:
leases = maybe_perform_dhcp_discovery(
- self.iface, self.dhcp_log_func)
+ self.iface, self.dhcp_log_func
+ )
except InvalidDHCPLeaseFileError as e:
raise NoDHCPLeaseError() from e
if not leases:
raise NoDHCPLeaseError()
self.lease = leases[-1]
- LOG.debug("Received dhcp lease on %s for %s/%s",
- self.lease['interface'], self.lease['fixed-address'],
- self.lease['subnet-mask'])
- nmap = {'interface': 'interface', 'ip': 'fixed-address',
- 'prefix_or_mask': 'subnet-mask',
- 'broadcast': 'broadcast-address',
- 'static_routes': [
- 'rfc3442-classless-static-routes',
- 'classless-static-routes'
- ],
- 'router': 'routers'}
+ LOG.debug(
+ "Received dhcp lease on %s for %s/%s",
+ self.lease["interface"],
+ self.lease["fixed-address"],
+ self.lease["subnet-mask"],
+ )
+ nmap = {
+ "interface": "interface",
+ "ip": "fixed-address",
+ "prefix_or_mask": "subnet-mask",
+ "broadcast": "broadcast-address",
+ "static_routes": [
+ "rfc3442-classless-static-routes",
+ "classless-static-routes",
+ ],
+ "router": "routers",
+ }
kwargs = self.extract_dhcp_options_mapping(nmap)
- if not kwargs['broadcast']:
- kwargs['broadcast'] = bcip(kwargs['prefix_or_mask'], kwargs['ip'])
- if kwargs['static_routes']:
- kwargs['static_routes'] = (
- parse_static_routes(kwargs['static_routes']))
+ if not kwargs["broadcast"]:
+ kwargs["broadcast"] = bcip(kwargs["prefix_or_mask"], kwargs["ip"])
+ if kwargs["static_routes"]:
+ kwargs["static_routes"] = parse_static_routes(
+ kwargs["static_routes"]
+ )
if self.connectivity_url_data:
- kwargs['connectivity_url_data'] = self.connectivity_url_data
+ kwargs["connectivity_url_data"] = self.connectivity_url_data
ephipv4 = EphemeralIPv4Network(**kwargs)
ephipv4.__enter__()
self._ephipv4 = ephipv4
@@ -122,16 +134,15 @@ class EphemeralDHCPv4(object):
for internal_reference, lease_option_names in nmap.items():
if isinstance(lease_option_names, list):
self.get_first_option_value(
- internal_reference,
- lease_option_names,
- result
+ internal_reference, lease_option_names, result
)
else:
result[internal_reference] = self.lease.get(lease_option_names)
return result
- def get_first_option_value(self, internal_mapping,
- lease_option_names, result):
+ def get_first_option_value(
+ self, internal_mapping, lease_option_names, result
+ ):
for different_names in lease_option_names:
if not result.get(internal_mapping):
result[internal_mapping] = self.lease.get(different_names)
@@ -153,19 +164,20 @@ def maybe_perform_dhcp_discovery(nic=None, dhcp_log_func=None):
if nic is None:
nic = find_fallback_nic()
if nic is None:
- LOG.debug('Skip dhcp_discovery: Unable to find fallback nic.')
+ LOG.debug("Skip dhcp_discovery: Unable to find fallback nic.")
return []
elif nic not in get_devicelist():
LOG.debug(
- 'Skip dhcp_discovery: nic %s not found in get_devicelist.', nic)
+ "Skip dhcp_discovery: nic %s not found in get_devicelist.", nic
+ )
return []
- dhclient_path = subp.which('dhclient')
+ dhclient_path = subp.which("dhclient")
if not dhclient_path:
- LOG.debug('Skip dhclient configuration: No dhclient command found.')
+ LOG.debug("Skip dhclient configuration: No dhclient command found.")
return []
- with temp_utils.tempdir(rmtree_ignore_errors=True,
- prefix='cloud-init-dhcp-',
- needs_exe=True) as tdir:
+ with temp_utils.tempdir(
+ rmtree_ignore_errors=True, prefix="cloud-init-dhcp-", needs_exe=True
+ ) as tdir:
# Use /var/tmp because /run/cloud-init/tmp is mounted noexec
return dhcp_discovery(dhclient_path, nic, tdir, dhcp_log_func)
@@ -184,20 +196,23 @@ def parse_dhcp_lease_file(lease_file):
lease_content = util.load_file(lease_file)
if len(lease_content) == 0:
raise InvalidDHCPLeaseFileError(
- 'Cannot parse empty dhcp lease file {0}'.format(lease_file))
+ "Cannot parse empty dhcp lease file {0}".format(lease_file)
+ )
for lease in lease_regex.findall(lease_content):
lease_options = []
- for line in lease.split(';'):
+ for line in lease.split(";"):
# Strip newlines, double-quotes and option prefix
- line = line.strip().replace('"', '').replace('option ', '')
+ line = line.strip().replace('"', "").replace("option ", "")
if not line:
continue
- lease_options.append(line.split(' ', 1))
+ lease_options.append(line.split(" ", 1))
dhcp_leases.append(dict(lease_options))
if not dhcp_leases:
raise InvalidDHCPLeaseFileError(
- 'Cannot parse dhcp lease file {0}. No leases found'.format(
- lease_file))
+ "Cannot parse dhcp lease file {0}. No leases found".format(
+ lease_file
+ )
+ )
return dhcp_leases
@@ -214,17 +229,17 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
@return: A list of dicts of representing the dhcp leases parsed from the
dhcp.leases file or empty list.
"""
- LOG.debug('Performing a dhcp discovery on %s', interface)
+ LOG.debug("Performing a dhcp discovery on %s", interface)
# XXX We copy dhclient out of /sbin/dhclient to avoid dealing with strict
# app armor profiles which disallow running dhclient -sf <our-script-file>.
# We want to avoid running /sbin/dhclient-script because of side-effects in
# /etc/resolv.conf any any other vendor specific scripts in
# /etc/dhcp/dhclient*hooks.d.
- sandbox_dhclient_cmd = os.path.join(cleandir, 'dhclient')
+ sandbox_dhclient_cmd = os.path.join(cleandir, "dhclient")
util.copy(dhclient_cmd_path, sandbox_dhclient_cmd)
- pid_file = os.path.join(cleandir, 'dhclient.pid')
- lease_file = os.path.join(cleandir, 'dhcp.leases')
+ pid_file = os.path.join(cleandir, "dhclient.pid")
+ lease_file = os.path.join(cleandir, "dhcp.leases")
# In some cases files in /var/tmp may not be executable, launching dhclient
# from there will certainly raise 'Permission denied' error. Try launching
@@ -236,9 +251,19 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
# Generally dhclient relies on dhclient-script PREINIT action to bring the
# link up before attempting discovery. Since we are using -sf /bin/true,
# we need to do that "link up" ourselves first.
- subp.subp(['ip', 'link', 'set', 'dev', interface, 'up'], capture=True)
- cmd = [sandbox_dhclient_cmd, '-1', '-v', '-lf', lease_file,
- '-pf', pid_file, interface, '-sf', '/bin/true']
+ subp.subp(["ip", "link", "set", "dev", interface, "up"], capture=True)
+ cmd = [
+ sandbox_dhclient_cmd,
+ "-1",
+ "-v",
+ "-lf",
+ lease_file,
+ "-pf",
+ pid_file,
+ interface,
+ "-sf",
+ "/bin/true",
+ ]
out, err = subp.subp(cmd, capture=True)
# Wait for pid file and lease file to appear, and for the process
@@ -249,13 +274,16 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
# kill the correct process, thus freeing cleandir to be deleted back
# up the callstack.
missing = util.wait_for_files(
- [pid_file, lease_file], maxwait=5, naplen=0.01)
+ [pid_file, lease_file], maxwait=5, naplen=0.01
+ )
if missing:
- LOG.warning("dhclient did not produce expected files: %s",
- ', '.join(os.path.basename(f) for f in missing))
+ LOG.warning(
+ "dhclient did not produce expected files: %s",
+ ", ".join(os.path.basename(f) for f in missing),
+ )
return []
- ppid = 'unknown'
+ ppid = "unknown"
daemonized = False
for _ in range(0, 1000):
pid_content = util.load_file(pid_file).strip()
@@ -266,7 +294,7 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
else:
ppid = util.get_proc_ppid(pid)
if ppid == 1:
- LOG.debug('killing dhclient with pid=%s', pid)
+ LOG.debug("killing dhclient with pid=%s", pid)
os.kill(pid, signal.SIGKILL)
daemonized = True
break
@@ -274,8 +302,11 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
if not daemonized:
LOG.error(
- 'dhclient(pid=%s, parentpid=%s) failed to daemonize after %s '
- 'seconds', pid_content, ppid, 0.01 * 1000
+ "dhclient(pid=%s, parentpid=%s) failed to daemonize after %s "
+ "seconds",
+ pid_content,
+ ppid,
+ 0.01 * 1000,
)
if dhcp_log_func is not None:
dhcp_log_func(out, err)
@@ -307,7 +338,8 @@ def networkd_load_leases(leases_d=None):
return ret
for lfile in os.listdir(leases_d):
ret[lfile] = networkd_parse_lease(
- util.load_file(os.path.join(leases_d, lfile)))
+ util.load_file(os.path.join(leases_d, lfile))
+ )
return ret
@@ -322,7 +354,7 @@ def networkd_get_option_from_leases(keyname, leases_d=None):
def parse_static_routes(rfc3442):
- """ parse rfc3442 format and return a list containing tuple of strings.
+ """parse rfc3442 format and return a list containing tuple of strings.
The tuple is composed of the network_address (including net length) and
gateway for a parsed static route. It can parse two formats of rfc3442,
@@ -352,10 +384,12 @@ def parse_static_routes(rfc3442):
static_routes = []
def _trunc_error(cidr, required, remain):
- msg = ("RFC3442 string malformed. Current route has CIDR of %s "
- "and requires %s significant octets, but only %s remain. "
- "Verify DHCP rfc3442-classless-static-routes value: %s"
- % (cidr, required, remain, rfc3442))
+ msg = (
+ "RFC3442 string malformed. Current route has CIDR of %s "
+ "and requires %s significant octets, but only %s remain. "
+ "Verify DHCP rfc3442-classless-static-routes value: %s"
+ % (cidr, required, remain, rfc3442)
+ )
LOG.error(msg)
current_idx = 0
@@ -368,32 +402,32 @@ def parse_static_routes(rfc3442):
if len(tokens[idx:]) < req_toks:
_trunc_error(net_length, req_toks, len(tokens[idx:]))
return static_routes
- net_address = ".".join(tokens[idx+1:idx+5])
- gateway = ".".join(tokens[idx+5:idx+req_toks])
+ net_address = ".".join(tokens[idx + 1 : idx + 5])
+ gateway = ".".join(tokens[idx + 5 : idx + req_toks])
current_idx = idx + req_toks
elif net_length in range(17, 25):
req_toks = 8
if len(tokens[idx:]) < req_toks:
_trunc_error(net_length, req_toks, len(tokens[idx:]))
return static_routes
- net_address = ".".join(tokens[idx+1:idx+4] + ["0"])
- gateway = ".".join(tokens[idx+4:idx+req_toks])
+ net_address = ".".join(tokens[idx + 1 : idx + 4] + ["0"])
+ gateway = ".".join(tokens[idx + 4 : idx + req_toks])
current_idx = idx + req_toks
elif net_length in range(9, 17):
req_toks = 7
if len(tokens[idx:]) < req_toks:
_trunc_error(net_length, req_toks, len(tokens[idx:]))
return static_routes
- net_address = ".".join(tokens[idx+1:idx+3] + ["0", "0"])
- gateway = ".".join(tokens[idx+3:idx+req_toks])
+ net_address = ".".join(tokens[idx + 1 : idx + 3] + ["0", "0"])
+ gateway = ".".join(tokens[idx + 3 : idx + req_toks])
current_idx = idx + req_toks
elif net_length in range(1, 9):
req_toks = 6
if len(tokens[idx:]) < req_toks:
_trunc_error(net_length, req_toks, len(tokens[idx:]))
return static_routes
- net_address = ".".join(tokens[idx+1:idx+2] + ["0", "0", "0"])
- gateway = ".".join(tokens[idx+2:idx+req_toks])
+ net_address = ".".join(tokens[idx + 1 : idx + 2] + ["0", "0", "0"])
+ gateway = ".".join(tokens[idx + 2 : idx + req_toks])
current_idx = idx + req_toks
elif net_length == 0:
req_toks = 5
@@ -401,15 +435,19 @@ def parse_static_routes(rfc3442):
_trunc_error(net_length, req_toks, len(tokens[idx:]))
return static_routes
net_address = "0.0.0.0"
- gateway = ".".join(tokens[idx+1:idx+req_toks])
+ gateway = ".".join(tokens[idx + 1 : idx + req_toks])
current_idx = idx + req_toks
else:
- LOG.error('Parsed invalid net length "%s". Verify DHCP '
- 'rfc3442-classless-static-routes value.', net_length)
+ LOG.error(
+ 'Parsed invalid net length "%s". Verify DHCP '
+ "rfc3442-classless-static-routes value.",
+ net_length,
+ )
return static_routes
static_routes.append(("%s/%s" % (net_address, net_length), gateway))
return static_routes
+
# vi: ts=4 expandtab
diff --git a/cloudinit/net/eni.py b/cloudinit/net/eni.py
index a89e5ad2..99e3fbb0 100644
--- a/cloudinit/net/eni.py
+++ b/cloudinit/net/eni.py
@@ -5,32 +5,58 @@ import glob
import os
import re
-from . import ParserError
-
-from . import renderer
-from .network_state import subnet_is_ipv6
-
from cloudinit import log as logging
-from cloudinit import subp
-from cloudinit import util
+from cloudinit import subp, util
+from . import ParserError, renderer
+from .network_state import subnet_is_ipv6
LOG = logging.getLogger(__name__)
NET_CONFIG_COMMANDS = [
- "pre-up", "up", "post-up", "down", "pre-down", "post-down",
+ "pre-up",
+ "up",
+ "post-up",
+ "down",
+ "pre-down",
+ "post-down",
]
NET_CONFIG_BRIDGE_OPTIONS = [
- "bridge_ageing", "bridge_bridgeprio", "bridge_fd", "bridge_gcinit",
- "bridge_hello", "bridge_maxage", "bridge_maxwait", "bridge_stp",
+ "bridge_ageing",
+ "bridge_bridgeprio",
+ "bridge_fd",
+ "bridge_gcinit",
+ "bridge_hello",
+ "bridge_maxage",
+ "bridge_maxwait",
+ "bridge_stp",
]
NET_CONFIG_OPTIONS = [
- "address", "netmask", "broadcast", "network", "metric", "gateway",
- "pointtopoint", "media", "mtu", "hostname", "leasehours", "leasetime",
- "vendor", "client", "bootfile", "server", "hwaddr", "provider", "frame",
- "netnum", "endpoint", "local", "ttl",
+ "address",
+ "netmask",
+ "broadcast",
+ "network",
+ "metric",
+ "gateway",
+ "pointtopoint",
+ "media",
+ "mtu",
+ "hostname",
+ "leasehours",
+ "leasetime",
+ "vendor",
+ "client",
+ "bootfile",
+ "server",
+ "hwaddr",
+ "provider",
+ "frame",
+ "netnum",
+ "endpoint",
+ "local",
+ "ttl",
]
@@ -38,27 +64,27 @@ NET_CONFIG_OPTIONS = [
def _iface_add_subnet(iface, subnet):
content = []
valid_map = [
- 'address',
- 'netmask',
- 'broadcast',
- 'metric',
- 'gateway',
- 'pointopoint',
- 'mtu',
- 'scope',
- 'dns_search',
- 'dns_nameservers',
+ "address",
+ "netmask",
+ "broadcast",
+ "metric",
+ "gateway",
+ "pointopoint",
+ "mtu",
+ "scope",
+ "dns_search",
+ "dns_nameservers",
]
for key, value in subnet.items():
- if key == 'netmask':
+ if key == "netmask":
continue
- if key == 'address':
- value = "%s/%s" % (subnet['address'], subnet['prefix'])
+ if key == "address":
+ value = "%s/%s" % (subnet["address"], subnet["prefix"])
if value and key in valid_map:
if type(value) == list:
value = " ".join(value)
- if '_' in key:
- key = key.replace('_', '-')
+ if "_" in key:
+ key = key.replace("_", "-")
content.append(" {0} {1}".format(key, value))
return sorted(content)
@@ -75,41 +101,44 @@ def _iface_add_attrs(iface, index, ipv4_subnet_mtu):
return []
content = []
ignore_map = [
- 'control',
- 'device_id',
- 'driver',
- 'index',
- 'inet',
- 'mode',
- 'name',
- 'subnets',
- 'type',
+ "control",
+ "device_id",
+ "driver",
+ "index",
+ "inet",
+ "mode",
+ "name",
+ "subnets",
+ "type",
]
# The following parameters require repetitive entries of the key for
# each of the values
multiline_keys = [
- 'bridge_pathcost',
- 'bridge_portprio',
- 'bridge_waitport',
+ "bridge_pathcost",
+ "bridge_portprio",
+ "bridge_waitport",
]
- renames = {'mac_address': 'hwaddress'}
- if iface['type'] not in ['bond', 'bridge', 'infiniband', 'vlan']:
- ignore_map.append('mac_address')
+ renames = {"mac_address": "hwaddress"}
+ if iface["type"] not in ["bond", "bridge", "infiniband", "vlan"]:
+ ignore_map.append("mac_address")
for key, value in iface.items():
# convert bool to string for eni
if type(value) == bool:
- value = 'on' if iface[key] else 'off'
+ value = "on" if iface[key] else "off"
if not value or key in ignore_map:
continue
- if key == 'mtu' and ipv4_subnet_mtu:
+ if key == "mtu" and ipv4_subnet_mtu:
if value != ipv4_subnet_mtu:
LOG.warning(
"Network config: ignoring %s device-level mtu:%s because"
" ipv4 subnet-level mtu:%s provided.",
- iface['name'], value, ipv4_subnet_mtu)
+ iface["name"],
+ value,
+ ipv4_subnet_mtu,
+ )
continue
if key in multiline_keys:
for v in value:
@@ -123,9 +152,9 @@ def _iface_add_attrs(iface, index, ipv4_subnet_mtu):
def _iface_start_entry(iface, index, render_hwaddress=False):
- fullname = iface['name']
+ fullname = iface["name"]
- control = iface['control']
+ control = iface["control"]
if control == "auto":
cverb = "auto"
elif control in ("hotplug",):
@@ -134,12 +163,13 @@ def _iface_start_entry(iface, index, render_hwaddress=False):
cverb = "# control-" + control
subst = iface.copy()
- subst.update({'fullname': fullname, 'cverb': cverb})
+ subst.update({"fullname": fullname, "cverb": cverb})
lines = [
"{cverb} {fullname}".format(**subst),
- "iface {fullname} {inet} {mode}".format(**subst)]
- if render_hwaddress and iface.get('mac_address'):
+ "iface {fullname} {inet} {mode}".format(**subst),
+ ]
+ if render_hwaddress and iface.get("mac_address"):
lines.append(" hwaddress {mac_address}".format(**subst))
return lines
@@ -159,9 +189,9 @@ def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
currif = None
for line in contents.splitlines():
line = line.strip()
- if line.startswith('#'):
+ if line.startswith("#"):
continue
- split = line.split(' ')
+ split = line.split(" ")
option = split[0]
if option == "source-directory":
parsed_src_dir = split[1]
@@ -172,16 +202,18 @@ def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
dir_contents = [
os.path.join(expanded_path, path)
for path in dir_contents
- if (os.path.isfile(os.path.join(expanded_path, path)) and
- re.match("^[a-zA-Z0-9_-]+$", path) is not None)
+ if (
+ os.path.isfile(os.path.join(expanded_path, path))
+ and re.match("^[a-zA-Z0-9_-]+$", path) is not None
+ )
]
for entry in dir_contents:
with open(entry, "r") as fp:
src_data = fp.read().strip()
abs_entry = os.path.abspath(entry)
_parse_deb_config_data(
- ifaces, src_data,
- os.path.dirname(abs_entry), abs_entry)
+ ifaces, src_data, os.path.dirname(abs_entry), abs_entry
+ )
elif option == "source":
new_src_path = split[1]
if not new_src_path.startswith("/"):
@@ -191,8 +223,8 @@ def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
src_data = fp.read().strip()
abs_path = os.path.abspath(expanded_path)
_parse_deb_config_data(
- ifaces, src_data,
- os.path.dirname(abs_path), abs_path)
+ ifaces, src_data, os.path.dirname(abs_path), abs_path
+ )
elif option == "auto":
for iface in split[1:]:
if iface not in ifaces:
@@ -200,7 +232,7 @@ def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
# Include the source path this interface was found in.
"_source_path": src_path
}
- ifaces[iface]['auto'] = True
+ ifaces[iface]["auto"] = True
elif option == "iface":
iface, family, method = split[1:4]
if iface not in ifaces:
@@ -208,71 +240,72 @@ def _parse_deb_config_data(ifaces, contents, src_dir, src_path):
# Include the source path this interface was found in.
"_source_path": src_path
}
- elif 'family' in ifaces[iface]:
+ elif "family" in ifaces[iface]:
raise ParserError(
"Interface %s can only be defined once. "
- "Re-defined in '%s'." % (iface, src_path))
- ifaces[iface]['family'] = family
- ifaces[iface]['method'] = method
+ "Re-defined in '%s'." % (iface, src_path)
+ )
+ ifaces[iface]["family"] = family
+ ifaces[iface]["method"] = method
currif = iface
elif option == "hwaddress":
if split[1] == "ether":
val = split[2]
else:
val = split[1]
- ifaces[currif]['hwaddress'] = val
+ ifaces[currif]["hwaddress"] = val
elif option in NET_CONFIG_OPTIONS:
ifaces[currif][option] = split[1]
elif option in NET_CONFIG_COMMANDS:
if option not in ifaces[currif]:
ifaces[currif][option] = []
- ifaces[currif][option].append(' '.join(split[1:]))
- elif option.startswith('dns-'):
- if 'dns' not in ifaces[currif]:
- ifaces[currif]['dns'] = {}
- if option == 'dns-search':
- ifaces[currif]['dns']['search'] = []
+ ifaces[currif][option].append(" ".join(split[1:]))
+ elif option.startswith("dns-"):
+ if "dns" not in ifaces[currif]:
+ ifaces[currif]["dns"] = {}
+ if option == "dns-search":
+ ifaces[currif]["dns"]["search"] = []
for domain in split[1:]:
- ifaces[currif]['dns']['search'].append(domain)
- elif option == 'dns-nameservers':
- ifaces[currif]['dns']['nameservers'] = []
+ ifaces[currif]["dns"]["search"].append(domain)
+ elif option == "dns-nameservers":
+ ifaces[currif]["dns"]["nameservers"] = []
for server in split[1:]:
- ifaces[currif]['dns']['nameservers'].append(server)
- elif option.startswith('bridge_'):
- if 'bridge' not in ifaces[currif]:
- ifaces[currif]['bridge'] = {}
+ ifaces[currif]["dns"]["nameservers"].append(server)
+ elif option.startswith("bridge_"):
+ if "bridge" not in ifaces[currif]:
+ ifaces[currif]["bridge"] = {}
if option in NET_CONFIG_BRIDGE_OPTIONS:
- bridge_option = option.replace('bridge_', '', 1)
- ifaces[currif]['bridge'][bridge_option] = split[1]
+ bridge_option = option.replace("bridge_", "", 1)
+ ifaces[currif]["bridge"][bridge_option] = split[1]
elif option == "bridge_ports":
- ifaces[currif]['bridge']['ports'] = []
+ ifaces[currif]["bridge"]["ports"] = []
for iface in split[1:]:
- ifaces[currif]['bridge']['ports'].append(iface)
+ ifaces[currif]["bridge"]["ports"].append(iface)
elif option == "bridge_hw":
# doc is confusing and thus some may put literal 'MAC'
# bridge_hw MAC <address>
# but correct is:
# bridge_hw <address>
if split[1].lower() == "mac":
- ifaces[currif]['bridge']['mac'] = split[2]
+ ifaces[currif]["bridge"]["mac"] = split[2]
else:
- ifaces[currif]['bridge']['mac'] = split[1]
+ ifaces[currif]["bridge"]["mac"] = split[1]
elif option == "bridge_pathcost":
- if 'pathcost' not in ifaces[currif]['bridge']:
- ifaces[currif]['bridge']['pathcost'] = {}
- ifaces[currif]['bridge']['pathcost'][split[1]] = split[2]
+ if "pathcost" not in ifaces[currif]["bridge"]:
+ ifaces[currif]["bridge"]["pathcost"] = {}
+ ifaces[currif]["bridge"]["pathcost"][split[1]] = split[2]
elif option == "bridge_portprio":
- if 'portprio' not in ifaces[currif]['bridge']:
- ifaces[currif]['bridge']['portprio'] = {}
- ifaces[currif]['bridge']['portprio'][split[1]] = split[2]
- elif option.startswith('bond-'):
- if 'bond' not in ifaces[currif]:
- ifaces[currif]['bond'] = {}
- bond_option = option.replace('bond-', '', 1)
- ifaces[currif]['bond'][bond_option] = split[1]
+ if "portprio" not in ifaces[currif]["bridge"]:
+ ifaces[currif]["bridge"]["portprio"] = {}
+ ifaces[currif]["bridge"]["portprio"][split[1]] = split[2]
+ elif option.startswith("bond-"):
+ if "bond" not in ifaces[currif]:
+ ifaces[currif]["bond"] = {}
+ bond_option = option.replace("bond-", "", 1)
+ ifaces[currif]["bond"][bond_option] = split[1]
for iface in ifaces.keys():
- if 'auto' not in ifaces[iface]:
- ifaces[iface]['auto'] = False
+ if "auto" not in ifaces[iface]:
+ ifaces[iface]["auto"] = False
def parse_deb_config(path):
@@ -282,8 +315,8 @@ def parse_deb_config(path):
contents = fp.read().strip()
abs_path = os.path.abspath(path)
_parse_deb_config_data(
- ifaces, contents,
- os.path.dirname(abs_path), abs_path)
+ ifaces, contents, os.path.dirname(abs_path), abs_path
+ )
return ifaces
@@ -308,32 +341,31 @@ def _ifaces_to_net_config_data(ifaces):
dtype = "loopback"
else:
dtype = "physical"
- devs[devname] = {'type': dtype, 'name': devname, 'subnets': []}
+ devs[devname] = {"type": dtype, "name": devname, "subnets": []}
# this isnt strictly correct, but some might specify
# hwaddress on a nic for matching / declaring name.
- if 'hwaddress' in data:
- devs[devname]['mac_address'] = data['hwaddress']
- subnet = {'_orig_eni_name': name, 'type': data['method']}
- if data.get('auto'):
- subnet['control'] = 'auto'
+ if "hwaddress" in data:
+ devs[devname]["mac_address"] = data["hwaddress"]
+ subnet = {"_orig_eni_name": name, "type": data["method"]}
+ if data.get("auto"):
+ subnet["control"] = "auto"
else:
- subnet['control'] = 'manual'
+ subnet["control"] = "manual"
- if data.get('method') == 'static':
- subnet['address'] = data['address']
+ if data.get("method") == "static":
+ subnet["address"] = data["address"]
- for copy_key in ('netmask', 'gateway', 'broadcast'):
+ for copy_key in ("netmask", "gateway", "broadcast"):
if copy_key in data:
subnet[copy_key] = data[copy_key]
- if 'dns' in data:
- for n in ('nameservers', 'search'):
- if n in data['dns'] and data['dns'][n]:
- subnet['dns_' + n] = data['dns'][n]
- devs[devname]['subnets'].append(subnet)
+ if "dns" in data:
+ for n in ("nameservers", "search"):
+ if n in data["dns"] and data["dns"][n]:
+ subnet["dns_" + n] = data["dns"][n]
+ devs[devname]["subnets"].append(subnet)
- return {'version': 1,
- 'config': [devs[d] for d in sorted(devs)]}
+ return {"version": 1, "config": [devs[d] for d in sorted(devs)]}
class Renderer(renderer.Renderer):
@@ -342,10 +374,11 @@ class Renderer(renderer.Renderer):
def __init__(self, config=None):
if not config:
config = {}
- self.eni_path = config.get('eni_path', 'etc/network/interfaces')
- self.eni_header = config.get('eni_header', None)
+ self.eni_path = config.get("eni_path", "etc/network/interfaces")
+ self.eni_header = config.get("eni_header", None)
self.netrules_path = config.get(
- 'netrules_path', 'etc/udev/rules.d/70-persistent-net.rules')
+ "netrules_path", "etc/udev/rules.d/70-persistent-net.rules"
+ )
def _render_route(self, route, indent=""):
"""When rendering routes for an iface, in some cases applying a route
@@ -367,153 +400,166 @@ class Renderer(renderer.Renderer):
down = indent + "pre-down route del"
or_true = " || true"
mapping = {
- 'gateway': 'gw',
- 'metric': 'metric',
+ "gateway": "gw",
+ "metric": "metric",
}
- default_gw = ''
- if route['network'] == '0.0.0.0' and route['netmask'] == '0.0.0.0':
- default_gw = ' default'
- elif route['network'] == '::' and route['prefix'] == 0:
- default_gw = ' -A inet6 default'
+ default_gw = ""
+ if route["network"] == "0.0.0.0" and route["netmask"] == "0.0.0.0":
+ default_gw = " default"
+ elif route["network"] == "::" and route["prefix"] == 0:
+ default_gw = " -A inet6 default"
- route_line = ''
- for k in ['network', 'gateway', 'metric']:
- if default_gw and k == 'network':
+ route_line = ""
+ for k in ["network", "gateway", "metric"]:
+ if default_gw and k == "network":
continue
- if k == 'gateway':
- route_line += '%s %s %s' % (default_gw, mapping[k], route[k])
+ if k == "gateway":
+ route_line += "%s %s %s" % (default_gw, mapping[k], route[k])
elif k in route:
- if k == 'network':
- if ':' in route[k]:
- route_line += ' -A inet6'
- elif route.get('prefix') == 32:
- route_line += ' -host'
+ if k == "network":
+ if ":" in route[k]:
+ route_line += " -A inet6"
+ elif route.get("prefix") == 32:
+ route_line += " -host"
else:
- route_line += ' -net'
- if 'prefix' in route:
- route_line += ' %s/%s' % (route[k], route['prefix'])
+ route_line += " -net"
+ if "prefix" in route:
+ route_line += " %s/%s" % (route[k], route["prefix"])
else:
- route_line += ' %s %s' % (mapping[k], route[k])
+ route_line += " %s %s" % (mapping[k], route[k])
content.append(up + route_line + or_true)
content.append(down + route_line + or_true)
return content
def _render_iface(self, iface, render_hwaddress=False):
sections = []
- subnets = iface.get('subnets', {})
- accept_ra = iface.pop('accept-ra', None)
- ethernet_wol = iface.pop('wakeonlan', None)
+ subnets = iface.get("subnets", {})
+ accept_ra = iface.pop("accept-ra", None)
+ ethernet_wol = iface.pop("wakeonlan", None)
if ethernet_wol:
# Specify WOL setting 'g' for using "Magic Packet"
- iface['ethernet-wol'] = 'g'
+ iface["ethernet-wol"] = "g"
if subnets:
for index, subnet in enumerate(subnets):
ipv4_subnet_mtu = None
- iface['index'] = index
- iface['mode'] = subnet['type']
- iface['control'] = subnet.get('control', 'auto')
- subnet_inet = 'inet'
+ iface["index"] = index
+ iface["mode"] = subnet["type"]
+ iface["control"] = subnet.get("control", "auto")
+ subnet_inet = "inet"
if subnet_is_ipv6(subnet):
- subnet_inet += '6'
+ subnet_inet += "6"
else:
- ipv4_subnet_mtu = subnet.get('mtu')
- iface['inet'] = subnet_inet
- if (subnet['type'] == 'dhcp4' or subnet['type'] == 'dhcp6' or
- subnet['type'] == 'ipv6_dhcpv6-stateful'):
+ ipv4_subnet_mtu = subnet.get("mtu")
+ iface["inet"] = subnet_inet
+ if (
+ subnet["type"] == "dhcp4"
+ or subnet["type"] == "dhcp6"
+ or subnet["type"] == "ipv6_dhcpv6-stateful"
+ ):
# Configure network settings using DHCP or DHCPv6
- iface['mode'] = 'dhcp'
+ iface["mode"] = "dhcp"
if accept_ra is not None:
# Accept router advertisements (0=off, 1=on)
- iface['accept_ra'] = '1' if accept_ra else '0'
- elif subnet['type'] == 'ipv6_dhcpv6-stateless':
+ iface["accept_ra"] = "1" if accept_ra else "0"
+ elif subnet["type"] == "ipv6_dhcpv6-stateless":
# Configure network settings using SLAAC from RAs
- iface['mode'] = 'auto'
+ iface["mode"] = "auto"
# Use stateless DHCPv6 (0=off, 1=on)
- iface['dhcp'] = '1'
- elif subnet['type'] == 'ipv6_slaac':
+ iface["dhcp"] = "1"
+ elif subnet["type"] == "ipv6_slaac":
# Configure network settings using SLAAC from RAs
- iface['mode'] = 'auto'
+ iface["mode"] = "auto"
# Use stateless DHCPv6 (0=off, 1=on)
- iface['dhcp'] = '0'
+ iface["dhcp"] = "0"
elif subnet_is_ipv6(subnet):
# mode might be static6, eni uses 'static'
- iface['mode'] = 'static'
+ iface["mode"] = "static"
if accept_ra is not None:
# Accept router advertisements (0=off, 1=on)
- iface['accept_ra'] = '1' if accept_ra else '0'
+ iface["accept_ra"] = "1" if accept_ra else "0"
# do not emit multiple 'auto $IFACE' lines as older (precise)
# ifupdown complains
- if True in ["auto %s" % (iface['name']) in line
- for line in sections]:
- iface['control'] = 'alias'
+ if True in [
+ "auto %s" % (iface["name"]) in line for line in sections
+ ]:
+ iface["control"] = "alias"
lines = list(
_iface_start_entry(
- iface, index, render_hwaddress=render_hwaddress) +
- _iface_add_subnet(iface, subnet) +
- _iface_add_attrs(iface, index, ipv4_subnet_mtu)
+ iface, index, render_hwaddress=render_hwaddress
+ )
+ + _iface_add_subnet(iface, subnet)
+ + _iface_add_attrs(iface, index, ipv4_subnet_mtu)
)
- for route in subnet.get('routes', []):
+ for route in subnet.get("routes", []):
lines.extend(self._render_route(route, indent=" "))
sections.append(lines)
else:
# ifenslave docs say to auto the slave devices
lines = []
- if 'bond-master' in iface or 'bond-slaves' in iface:
+ if "bond-master" in iface or "bond-slaves" in iface:
lines.append("auto {name}".format(**iface))
lines.append("iface {name} {inet} {mode}".format(**iface))
lines.extend(
- _iface_add_attrs(iface, index=0, ipv4_subnet_mtu=None))
+ _iface_add_attrs(iface, index=0, ipv4_subnet_mtu=None)
+ )
sections.append(lines)
return sections
def _render_interfaces(self, network_state, render_hwaddress=False):
- '''Given state, emit etc/network/interfaces content.'''
+ """Given state, emit etc/network/interfaces content."""
# handle 'lo' specifically as we need to insert the global dns entries
# there (as that is the only interface that will be always up).
- lo = {'name': 'lo', 'type': 'physical', 'inet': 'inet',
- 'subnets': [{'type': 'loopback', 'control': 'auto'}]}
+ lo = {
+ "name": "lo",
+ "type": "physical",
+ "inet": "inet",
+ "subnets": [{"type": "loopback", "control": "auto"}],
+ }
for iface in network_state.iter_interfaces():
- if iface.get('name') == "lo":
+ if iface.get("name") == "lo":
lo = copy.deepcopy(iface)
nameservers = network_state.dns_nameservers
if nameservers:
- lo['subnets'][0]["dns_nameservers"] = (" ".join(nameservers))
+ lo["subnets"][0]["dns_nameservers"] = " ".join(nameservers)
searchdomains = network_state.dns_searchdomains
if searchdomains:
- lo['subnets'][0]["dns_search"] = (" ".join(searchdomains))
+ lo["subnets"][0]["dns_search"] = " ".join(searchdomains)
# Apply a sort order to ensure that we write out the physical
# interfaces first; this is critical for bonding
order = {
- 'loopback': 0,
- 'physical': 1,
- 'infiniband': 2,
- 'bond': 3,
- 'bridge': 4,
- 'vlan': 5,
+ "loopback": 0,
+ "physical": 1,
+ "infiniband": 2,
+ "bond": 3,
+ "bridge": 4,
+ "vlan": 5,
}
sections = []
sections.extend(self._render_iface(lo))
- for iface in sorted(network_state.iter_interfaces(),
- key=lambda k: (order[k['type']], k['name'])):
+ for iface in sorted(
+ network_state.iter_interfaces(),
+ key=lambda k: (order[k["type"]], k["name"]),
+ ):
- if iface.get('name') == "lo":
+ if iface.get("name") == "lo":
continue
sections.extend(
- self._render_iface(iface, render_hwaddress=render_hwaddress))
+ self._render_iface(iface, render_hwaddress=render_hwaddress)
+ )
for route in network_state.iter_routes():
sections.append(self._render_route(route))
- return '\n\n'.join(['\n'.join(s) for s in sections]) + "\n"
+ return "\n\n".join(["\n".join(s) for s in sections]) + "\n"
def render_network_state(self, network_state, templates=None, target=None):
fpeni = subp.target_path(target, self.eni_path)
@@ -524,34 +570,38 @@ class Renderer(renderer.Renderer):
if self.netrules_path:
netrules = subp.target_path(target, self.netrules_path)
util.ensure_dir(os.path.dirname(netrules))
- util.write_file(netrules,
- self._render_persistent_net(network_state))
+ util.write_file(
+ netrules, self._render_persistent_net(network_state)
+ )
def network_state_to_eni(network_state, header=None, render_hwaddress=False):
# render the provided network state, return a string of equivalent eni
- eni_path = 'etc/network/interfaces'
- renderer = Renderer(config={
- 'eni_path': eni_path,
- 'eni_header': header,
- 'netrules_path': None,
- })
+ eni_path = "etc/network/interfaces"
+ renderer = Renderer(
+ config={
+ "eni_path": eni_path,
+ "eni_header": header,
+ "netrules_path": None,
+ }
+ )
if not header:
header = ""
if not header.endswith("\n"):
header += "\n"
contents = renderer._render_interfaces(
- network_state, render_hwaddress=render_hwaddress)
+ network_state, render_hwaddress=render_hwaddress
+ )
return header + contents
def available(target=None):
- expected = ['ifquery', 'ifup', 'ifdown']
- search = ['/sbin', '/usr/sbin']
+ expected = ["ifquery", "ifup", "ifdown"]
+ search = ["/sbin", "/usr/sbin"]
for p in expected:
if not subp.which(p, search=search, target=target):
return False
- eni = subp.target_path(target, 'etc/network/interfaces')
+ eni = subp.target_path(target, "etc/network/interfaces")
if not os.path.isfile(eni):
return False
diff --git a/cloudinit/net/freebsd.py b/cloudinit/net/freebsd.py
index f8faf240..ec42b60c 100644
--- a/cloudinit/net/freebsd.py
+++ b/cloudinit/net/freebsd.py
@@ -1,31 +1,29 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import log as logging
import cloudinit.net.bsd
-from cloudinit import subp
-from cloudinit import util
+from cloudinit import log as logging
+from cloudinit import subp, util
LOG = logging.getLogger(__name__)
class Renderer(cloudinit.net.bsd.BSDRenderer):
-
def __init__(self, config=None):
self._route_cpt = 0
super(Renderer, self).__init__()
def rename_interface(self, cur_name, device_name):
- self.set_rc_config_value('ifconfig_%s_name' % cur_name, device_name)
+ self.set_rc_config_value("ifconfig_%s_name" % cur_name, device_name)
def write_config(self):
for device_name, v in self.interface_configurations.items():
- net_config = 'DHCP'
+ net_config = "DHCP"
if isinstance(v, dict):
- net_config = v.get('address') + ' netmask ' + v.get('netmask')
- mtu = v.get('mtu')
+ net_config = v.get("address") + " netmask " + v.get("netmask")
+ mtu = v.get("mtu")
if mtu:
- net_config += (' mtu %d' % mtu)
- self.set_rc_config_value('ifconfig_' + device_name, net_config)
+ net_config += " mtu %d" % mtu
+ self.set_rc_config_value("ifconfig_" + device_name, net_config)
def start_services(self, run=False):
if not run:
@@ -35,29 +33,33 @@ class Renderer(cloudinit.net.bsd.BSDRenderer):
for dhcp_interface in self.dhcp_interfaces():
# Observed on DragonFlyBSD 6. If we use the "restart" parameter,
# the routes are not recreated.
- subp.subp(['service', 'dhclient', 'stop', dhcp_interface],
- rcs=[0, 1],
- capture=True)
+ subp.subp(
+ ["service", "dhclient", "stop", dhcp_interface],
+ rcs=[0, 1],
+ capture=True,
+ )
- subp.subp(['service', 'netif', 'restart'], capture=True)
+ subp.subp(["service", "netif", "restart"], capture=True)
# On FreeBSD 10, the restart of routing and dhclient is likely to fail
# because
# - routing: it cannot remove the loopback route, but it will still set
# up the default route as expected.
# - dhclient: it cannot stop the dhclient started by the netif service.
# In both case, the situation is ok, and we can proceed.
- subp.subp(['service', 'routing', 'restart'], capture=True, rcs=[0, 1])
+ subp.subp(["service", "routing", "restart"], capture=True, rcs=[0, 1])
for dhcp_interface in self.dhcp_interfaces():
- subp.subp(['service', 'dhclient', 'start', dhcp_interface],
- rcs=[0, 1],
- capture=True)
+ subp.subp(
+ ["service", "dhclient", "start", dhcp_interface],
+ rcs=[0, 1],
+ capture=True,
+ )
def set_route(self, network, netmask, gateway):
- if network == '0.0.0.0':
- self.set_rc_config_value('defaultrouter', gateway)
+ if network == "0.0.0.0":
+ self.set_rc_config_value("defaultrouter", gateway)
else:
- route_name = 'route_net%d' % self._route_cpt
+ route_name = "route_net%d" % self._route_cpt
route_cmd = "-route %s/%s %s" % (network, netmask, gateway)
self.set_rc_config_value(route_name, route_cmd)
self._route_cpt += 1
diff --git a/cloudinit/net/netbsd.py b/cloudinit/net/netbsd.py
index 5f8881a5..3d6b85b7 100644
--- a/cloudinit/net/netbsd.py
+++ b/cloudinit/net/netbsd.py
@@ -1,45 +1,42 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import log as logging
-from cloudinit import subp
-from cloudinit import util
import cloudinit.net.bsd
+from cloudinit import log as logging
+from cloudinit import subp, util
LOG = logging.getLogger(__name__)
class Renderer(cloudinit.net.bsd.BSDRenderer):
-
def __init__(self, config=None):
super(Renderer, self).__init__()
def write_config(self):
if self.dhcp_interfaces():
- self.set_rc_config_value('dhcpcd', 'YES')
+ self.set_rc_config_value("dhcpcd", "YES")
self.set_rc_config_value(
- 'dhcpcd_flags',
- ' '.join(self.dhcp_interfaces())
+ "dhcpcd_flags", " ".join(self.dhcp_interfaces())
)
for device_name, v in self.interface_configurations.items():
if isinstance(v, dict):
- net_config = v.get('address') + ' netmask ' + v.get('netmask')
- mtu = v.get('mtu')
+ net_config = v.get("address") + " netmask " + v.get("netmask")
+ mtu = v.get("mtu")
if mtu:
- net_config += (' mtu %d' % mtu)
- self.set_rc_config_value('ifconfig_' + device_name, net_config)
+ net_config += " mtu %d" % mtu
+ self.set_rc_config_value("ifconfig_" + device_name, net_config)
def start_services(self, run=False):
if not run:
LOG.debug("netbsd generate postcmd disabled")
return
- subp.subp(['service', 'network', 'restart'], capture=True)
+ subp.subp(["service", "network", "restart"], capture=True)
if self.dhcp_interfaces():
- subp.subp(['service', 'dhcpcd', 'restart'], capture=True)
+ subp.subp(["service", "dhcpcd", "restart"], capture=True)
def set_route(self, network, netmask, gateway):
- if network == '0.0.0.0':
- self.set_rc_config_value('defaultroute', gateway)
+ if network == "0.0.0.0":
+ self.set_rc_config_value("defaultroute", gateway)
def available(target=None):
diff --git a/cloudinit/net/netplan.py b/cloudinit/net/netplan.py
index 41acf963..57ba2d9a 100644
--- a/cloudinit/net/netplan.py
+++ b/cloudinit/net/netplan.py
@@ -3,20 +3,18 @@
import copy
import os
+from cloudinit import log as logging
+from cloudinit import safeyaml, subp, util
+from cloudinit.net import SYS_CLASS_NET, get_devicelist
+
from . import renderer
from .network_state import (
+ IPV6_DYNAMIC_TYPES,
+ NET_CONFIG_TO_V2,
NetworkState,
subnet_is_ipv6,
- NET_CONFIG_TO_V2,
- IPV6_DYNAMIC_TYPES,
)
-from cloudinit import log as logging
-from cloudinit import util
-from cloudinit import subp
-from cloudinit import safeyaml
-from cloudinit.net import SYS_CLASS_NET, get_devicelist
-
KNOWN_SNAPD_CONFIG = b"""\
# This is the initial network config.
# It can be overwritten by cloud-init or console-conf.
@@ -37,8 +35,11 @@ LOG = logging.getLogger(__name__)
def _get_params_dict_by_match(config, match):
- return dict((key, value) for (key, value) in config.items()
- if key.startswith(match))
+ return dict(
+ (key, value)
+ for (key, value) in config.items()
+ if key.startswith(match)
+ )
def _extract_addresses(config, entry, ifname, features=None):
@@ -78,14 +79,16 @@ def _extract_addresses(config, entry, ifname, features=None):
"""
- def _listify(obj, token=' '):
+ def _listify(obj, token=" "):
"Helper to convert strings to list of strings, handle single string"
if not obj or type(obj) not in [str]:
return obj
if token in obj:
return obj.split(token)
else:
- return [obj, ]
+ return [
+ obj,
+ ]
if features is None:
features = []
@@ -93,78 +96,85 @@ def _extract_addresses(config, entry, ifname, features=None):
routes = []
nameservers = []
searchdomains = []
- subnets = config.get('subnets', [])
+ subnets = config.get("subnets", [])
if subnets is None:
subnets = []
for subnet in subnets:
- sn_type = subnet.get('type')
- if sn_type.startswith('dhcp'):
- if sn_type == 'dhcp':
- sn_type += '4'
+ sn_type = subnet.get("type")
+ if sn_type.startswith("dhcp"):
+ if sn_type == "dhcp":
+ sn_type += "4"
entry.update({sn_type: True})
elif sn_type in IPV6_DYNAMIC_TYPES:
- entry.update({'dhcp6': True})
- elif sn_type in ['static', 'static6']:
- addr = "%s" % subnet.get('address')
- if 'prefix' in subnet:
- addr += "/%d" % subnet.get('prefix')
- if 'gateway' in subnet and subnet.get('gateway'):
- gateway = subnet.get('gateway')
+ entry.update({"dhcp6": True})
+ elif sn_type in ["static", "static6"]:
+ addr = "%s" % subnet.get("address")
+ if "prefix" in subnet:
+ addr += "/%d" % subnet.get("prefix")
+ if "gateway" in subnet and subnet.get("gateway"):
+ gateway = subnet.get("gateway")
if ":" in gateway:
- entry.update({'gateway6': gateway})
+ entry.update({"gateway6": gateway})
else:
- entry.update({'gateway4': gateway})
- if 'dns_nameservers' in subnet:
- nameservers += _listify(subnet.get('dns_nameservers', []))
- if 'dns_search' in subnet:
- searchdomains += _listify(subnet.get('dns_search', []))
- if 'mtu' in subnet:
- mtukey = 'mtu'
- if subnet_is_ipv6(subnet) and 'ipv6-mtu' in features:
- mtukey = 'ipv6-mtu'
- entry.update({mtukey: subnet.get('mtu')})
- for route in subnet.get('routes', []):
- to_net = "%s/%s" % (route.get('network'),
- route.get('prefix'))
+ entry.update({"gateway4": gateway})
+ if "dns_nameservers" in subnet:
+ nameservers += _listify(subnet.get("dns_nameservers", []))
+ if "dns_search" in subnet:
+ searchdomains += _listify(subnet.get("dns_search", []))
+ if "mtu" in subnet:
+ mtukey = "mtu"
+ if subnet_is_ipv6(subnet) and "ipv6-mtu" in features:
+ mtukey = "ipv6-mtu"
+ entry.update({mtukey: subnet.get("mtu")})
+ for route in subnet.get("routes", []):
+ to_net = "%s/%s" % (route.get("network"), route.get("prefix"))
new_route = {
- 'via': route.get('gateway'),
- 'to': to_net,
+ "via": route.get("gateway"),
+ "to": to_net,
}
- if 'metric' in route:
- new_route.update({'metric': route.get('metric', 100)})
+ if "metric" in route:
+ new_route.update({"metric": route.get("metric", 100)})
routes.append(new_route)
addresses.append(addr)
- if 'mtu' in config:
- entry_mtu = entry.get('mtu')
- if entry_mtu and config['mtu'] != entry_mtu:
+ if "mtu" in config:
+ entry_mtu = entry.get("mtu")
+ if entry_mtu and config["mtu"] != entry_mtu:
LOG.warning(
"Network config: ignoring %s device-level mtu:%s because"
" ipv4 subnet-level mtu:%s provided.",
- ifname, config['mtu'], entry_mtu)
+ ifname,
+ config["mtu"],
+ entry_mtu,
+ )
else:
- entry['mtu'] = config['mtu']
+ entry["mtu"] = config["mtu"]
if len(addresses) > 0:
- entry.update({'addresses': addresses})
+ entry.update({"addresses": addresses})
if len(routes) > 0:
- entry.update({'routes': routes})
+ entry.update({"routes": routes})
if len(nameservers) > 0:
- ns = {'addresses': nameservers}
- entry.update({'nameservers': ns})
+ ns = {"addresses": nameservers}
+ entry.update({"nameservers": ns})
if len(searchdomains) > 0:
- ns = entry.get('nameservers', {})
- ns.update({'search': searchdomains})
- entry.update({'nameservers': ns})
- if 'accept-ra' in config and config['accept-ra'] is not None:
- entry.update({'accept-ra': util.is_true(config.get('accept-ra'))})
+ ns = entry.get("nameservers", {})
+ ns.update({"search": searchdomains})
+ entry.update({"nameservers": ns})
+ if "accept-ra" in config and config["accept-ra"] is not None:
+ entry.update({"accept-ra": util.is_true(config.get("accept-ra"))})
def _extract_bond_slaves_by_name(interfaces, entry, bond_master):
- bond_slave_names = sorted([name for (name, cfg) in interfaces.items()
- if cfg.get('bond-master', None) == bond_master])
+ bond_slave_names = sorted(
+ [
+ name
+ for (name, cfg) in interfaces.items()
+ if cfg.get("bond-master", None) == bond_master
+ ]
+ )
if len(bond_slave_names) > 0:
- entry.update({'interfaces': bond_slave_names})
+ entry.update({"interfaces": bond_slave_names})
def _clean_default(target=None):
@@ -177,13 +187,20 @@ def _clean_default(target=None):
if content != KNOWN_SNAPD_CONFIG:
return
- derived = [subp.target_path(target, f) for f in (
- 'run/systemd/network/10-netplan-all-en.network',
- 'run/systemd/network/10-netplan-all-eth.network',
- 'run/systemd/generator/netplan.stamp')]
+ derived = [
+ subp.target_path(target, f)
+ for f in (
+ "run/systemd/network/10-netplan-all-en.network",
+ "run/systemd/network/10-netplan-all-eth.network",
+ "run/systemd/generator/netplan.stamp",
+ )
+ ]
existing = [f for f in derived if os.path.isfile(f)]
- LOG.debug("removing known config '%s' and derived existing files: %s",
- tpath, existing)
+ LOG.debug(
+ "removing known config '%s' and derived existing files: %s",
+ tpath,
+ existing,
+ )
for f in [tpath] + existing:
os.unlink(f)
@@ -192,18 +209,19 @@ def _clean_default(target=None):
class Renderer(renderer.Renderer):
"""Renders network information in a /etc/netplan/network.yaml format."""
- NETPLAN_GENERATE = ['netplan', 'generate']
- NETPLAN_INFO = ['netplan', 'info']
+ NETPLAN_GENERATE = ["netplan", "generate"]
+ NETPLAN_INFO = ["netplan", "info"]
def __init__(self, config=None):
if not config:
config = {}
- self.netplan_path = config.get('netplan_path',
- 'etc/netplan/50-cloud-init.yaml')
- self.netplan_header = config.get('netplan_header', None)
- self._postcmds = config.get('postcmds', False)
- self.clean_default = config.get('clean_default', True)
- self._features = config.get('features', None)
+ self.netplan_path = config.get(
+ "netplan_path", "etc/netplan/50-cloud-init.yaml"
+ )
+ self.netplan_header = config.get("netplan_header", None)
+ self._postcmds = config.get("postcmds", False)
+ self.clean_default = config.get("clean_default", True)
+ self._features = config.get("features", None)
@property
def features(self):
@@ -211,13 +229,13 @@ class Renderer(renderer.Renderer):
try:
info_blob, _err = subp.subp(self.NETPLAN_INFO, capture=True)
info = util.load_yaml(info_blob)
- self._features = info['netplan.io']['features']
+ self._features = info["netplan.io"]["features"]
except subp.ProcessExecutionError:
# if the info subcommand is not present then we don't have any
# new features
pass
except (TypeError, KeyError) as e:
- LOG.debug('Failed to list features from netplan info: %s', e)
+ LOG.debug("Failed to list features from netplan info: %s", e)
return self._features
def render_network_state(self, network_state, templates=None, target=None):
@@ -249,26 +267,30 @@ class Renderer(renderer.Renderer):
def _net_setup_link(self, run=False):
"""To ensure device link properties are applied, we poke
- udev to re-evaluate networkd .link files and call
- the setup_link udev builtin command
+ udev to re-evaluate networkd .link files and call
+ the setup_link udev builtin command
"""
if not run:
LOG.debug("netplan net_setup_link postcmd disabled")
return
- setup_lnk = ['udevadm', 'test-builtin', 'net_setup_link']
- for cmd in [setup_lnk + [SYS_CLASS_NET + iface]
- for iface in get_devicelist() if
- os.path.islink(SYS_CLASS_NET + iface)]:
+ setup_lnk = ["udevadm", "test-builtin", "net_setup_link"]
+ for cmd in [
+ setup_lnk + [SYS_CLASS_NET + iface]
+ for iface in get_devicelist()
+ if os.path.islink(SYS_CLASS_NET + iface)
+ ]:
subp.subp(cmd, capture=True)
def _render_content(self, network_state: NetworkState):
# if content already in netplan format, pass it back
if network_state.version == 2:
- LOG.debug('V2 to V2 passthrough')
- return safeyaml.dumps({'network': network_state.config},
- explicit_start=False,
- explicit_end=False)
+ LOG.debug("V2 to V2 passthrough")
+ return safeyaml.dumps(
+ {"network": network_state.config},
+ explicit_start=False,
+ explicit_end=False,
+ )
ethernets = {}
wifis = {}
@@ -277,80 +299,83 @@ class Renderer(renderer.Renderer):
vlans = {}
content = []
- interfaces = network_state._network_state.get('interfaces', [])
+ interfaces = network_state._network_state.get("interfaces", [])
nameservers = network_state.dns_nameservers
searchdomains = network_state.dns_searchdomains
for config in network_state.iter_interfaces():
- ifname = config.get('name')
+ ifname = config.get("name")
# filter None (but not False) entries up front
- ifcfg = dict((key, value) for (key, value) in config.items()
- if value is not None)
-
- if_type = ifcfg.get('type')
- if if_type == 'physical':
+ ifcfg = dict(
+ (key, value)
+ for (key, value) in config.items()
+ if value is not None
+ )
+
+ if_type = ifcfg.get("type")
+ if if_type == "physical":
# required_keys = ['name', 'mac_address']
eth = {
- 'set-name': ifname,
- 'match': ifcfg.get('match', None),
+ "set-name": ifname,
+ "match": ifcfg.get("match", None),
}
- if eth['match'] is None:
- macaddr = ifcfg.get('mac_address', None)
+ if eth["match"] is None:
+ macaddr = ifcfg.get("mac_address", None)
if macaddr is not None:
- eth['match'] = {'macaddress': macaddr.lower()}
+ eth["match"] = {"macaddress": macaddr.lower()}
else:
- del eth['match']
- del eth['set-name']
+ del eth["match"]
+ del eth["set-name"]
_extract_addresses(ifcfg, eth, ifname, self.features)
ethernets.update({ifname: eth})
- elif if_type == 'bond':
+ elif if_type == "bond":
# required_keys = ['name', 'bond_interfaces']
bond = {}
bond_config = {}
# extract bond params and drop the bond_ prefix as it's
# redundent in v2 yaml format
- v2_bond_map = NET_CONFIG_TO_V2.get('bond')
- for match in ['bond_', 'bond-']:
+ v2_bond_map = NET_CONFIG_TO_V2.get("bond")
+ for match in ["bond_", "bond-"]:
bond_params = _get_params_dict_by_match(ifcfg, match)
for (param, value) in bond_params.items():
- newname = v2_bond_map.get(param.replace('_', '-'))
+ newname = v2_bond_map.get(param.replace("_", "-"))
if newname is None:
continue
bond_config.update({newname: value})
if len(bond_config) > 0:
- bond.update({'parameters': bond_config})
- if ifcfg.get('mac_address'):
- bond['macaddress'] = ifcfg.get('mac_address').lower()
- slave_interfaces = ifcfg.get('bond-slaves')
- if slave_interfaces == 'none':
+ bond.update({"parameters": bond_config})
+ if ifcfg.get("mac_address"):
+ bond["macaddress"] = ifcfg.get("mac_address").lower()
+ slave_interfaces = ifcfg.get("bond-slaves")
+ if slave_interfaces == "none":
_extract_bond_slaves_by_name(interfaces, bond, ifname)
_extract_addresses(ifcfg, bond, ifname, self.features)
bonds.update({ifname: bond})
- elif if_type == 'bridge':
+ elif if_type == "bridge":
# required_keys = ['name', 'bridge_ports']
- ports = sorted(copy.copy(ifcfg.get('bridge_ports')))
+ ports = sorted(copy.copy(ifcfg.get("bridge_ports")))
bridge = {
- 'interfaces': ports,
+ "interfaces": ports,
}
# extract bridge params and drop the bridge prefix as it's
# redundent in v2 yaml format
- match_prefix = 'bridge_'
+ match_prefix = "bridge_"
params = _get_params_dict_by_match(ifcfg, match_prefix)
br_config = {}
# v2 yaml uses different names for the keys
# and at least one value format change
- v2_bridge_map = NET_CONFIG_TO_V2.get('bridge')
+ v2_bridge_map = NET_CONFIG_TO_V2.get("bridge")
for (param, value) in params.items():
newname = v2_bridge_map.get(param)
if newname is None:
continue
br_config.update({newname: value})
- if newname in ['path-cost', 'port-priority']:
+ if newname in ["path-cost", "port-priority"]:
# <interface> <value> -> <interface>: int(<value>)
newvalue = {}
for val in value:
@@ -359,58 +384,60 @@ class Renderer(renderer.Renderer):
br_config.update({newname: newvalue})
if len(br_config) > 0:
- bridge.update({'parameters': br_config})
- if ifcfg.get('mac_address'):
- bridge['macaddress'] = ifcfg.get('mac_address').lower()
+ bridge.update({"parameters": br_config})
+ if ifcfg.get("mac_address"):
+ bridge["macaddress"] = ifcfg.get("mac_address").lower()
_extract_addresses(ifcfg, bridge, ifname, self.features)
bridges.update({ifname: bridge})
- elif if_type == 'vlan':
+ elif if_type == "vlan":
# required_keys = ['name', 'vlan_id', 'vlan-raw-device']
vlan = {
- 'id': ifcfg.get('vlan_id'),
- 'link': ifcfg.get('vlan-raw-device')
+ "id": ifcfg.get("vlan_id"),
+ "link": ifcfg.get("vlan-raw-device"),
}
- macaddr = ifcfg.get('mac_address', None)
+ macaddr = ifcfg.get("mac_address", None)
if macaddr is not None:
- vlan['macaddress'] = macaddr.lower()
+ vlan["macaddress"] = macaddr.lower()
_extract_addresses(ifcfg, vlan, ifname, self.features)
vlans.update({ifname: vlan})
# inject global nameserver values under each all interface which
# has addresses and do not already have a DNS configuration
if nameservers or searchdomains:
- nscfg = {'addresses': nameservers, 'search': searchdomains}
+ nscfg = {"addresses": nameservers, "search": searchdomains}
for section in [ethernets, wifis, bonds, bridges, vlans]:
for _name, cfg in section.items():
- if 'nameservers' in cfg or 'addresses' not in cfg:
+ if "nameservers" in cfg or "addresses" not in cfg:
continue
- cfg.update({'nameservers': nscfg})
+ cfg.update({"nameservers": nscfg})
# workaround yaml dictionary key sorting when dumping
def _render_section(name, section):
if section:
- dump = safeyaml.dumps({name: section},
- explicit_start=False,
- explicit_end=False,
- noalias=True)
- txt = util.indent(dump, ' ' * 4)
+ dump = safeyaml.dumps(
+ {name: section},
+ explicit_start=False,
+ explicit_end=False,
+ noalias=True,
+ )
+ txt = util.indent(dump, " " * 4)
return [txt]
return []
content.append("network:\n version: 2\n")
- content += _render_section('ethernets', ethernets)
- content += _render_section('wifis', wifis)
- content += _render_section('bonds', bonds)
- content += _render_section('bridges', bridges)
- content += _render_section('vlans', vlans)
+ content += _render_section("ethernets", ethernets)
+ content += _render_section("wifis", wifis)
+ content += _render_section("bonds", bonds)
+ content += _render_section("bridges", bridges)
+ content += _render_section("vlans", vlans)
return "".join(content)
def available(target=None):
- expected = ['netplan']
- search = ['/usr/sbin', '/sbin']
+ expected = ["netplan"]
+ search = ["/usr/sbin", "/sbin"]
for p in expected:
if not subp.which(p, search=search, target=target):
return False
@@ -419,11 +446,13 @@ def available(target=None):
def network_state_to_netplan(network_state, header=None):
# render the provided network state, return a string of equivalent eni
- netplan_path = 'etc/network/50-cloud-init.yaml'
- renderer = Renderer({
- 'netplan_path': netplan_path,
- 'netplan_header': header,
- })
+ netplan_path = "etc/network/50-cloud-init.yaml"
+ renderer = Renderer(
+ {
+ "netplan_path": netplan_path,
+ "netplan_header": header,
+ }
+ )
if not header:
header = ""
if not header.endswith("\n"):
diff --git a/cloudinit/net/network_state.py b/cloudinit/net/network_state.py
index 4862bf91..d7c9144f 100644
--- a/cloudinit/net/network_state.py
+++ b/cloudinit/net/network_state.py
@@ -10,52 +10,70 @@ import logging
import socket
import struct
-from cloudinit import safeyaml
-from cloudinit import util
+from cloudinit import safeyaml, util
LOG = logging.getLogger(__name__)
NETWORK_STATE_VERSION = 1
-IPV6_DYNAMIC_TYPES = ['dhcp6',
- 'ipv6_slaac',
- 'ipv6_dhcpv6-stateless',
- 'ipv6_dhcpv6-stateful']
+IPV6_DYNAMIC_TYPES = [
+ "dhcp6",
+ "ipv6_slaac",
+ "ipv6_dhcpv6-stateless",
+ "ipv6_dhcpv6-stateful",
+]
NETWORK_STATE_REQUIRED_KEYS = {
- 1: ['version', 'config', 'network_state'],
+ 1: ["version", "config", "network_state"],
}
NETWORK_V2_KEY_FILTER = [
- 'addresses', 'dhcp4', 'dhcp4-overrides', 'dhcp6', 'dhcp6-overrides',
- 'gateway4', 'gateway6', 'interfaces', 'match', 'mtu', 'nameservers',
- 'renderer', 'set-name', 'wakeonlan', 'accept-ra'
+ "addresses",
+ "dhcp4",
+ "dhcp4-overrides",
+ "dhcp6",
+ "dhcp6-overrides",
+ "gateway4",
+ "gateway6",
+ "interfaces",
+ "match",
+ "mtu",
+ "nameservers",
+ "renderer",
+ "set-name",
+ "wakeonlan",
+ "accept-ra",
]
NET_CONFIG_TO_V2 = {
- 'bond': {'bond-ad-select': 'ad-select',
- 'bond-arp-interval': 'arp-interval',
- 'bond-arp-ip-target': 'arp-ip-target',
- 'bond-arp-validate': 'arp-validate',
- 'bond-downdelay': 'down-delay',
- 'bond-fail-over-mac': 'fail-over-mac-policy',
- 'bond-lacp-rate': 'lacp-rate',
- 'bond-miimon': 'mii-monitor-interval',
- 'bond-min-links': 'min-links',
- 'bond-mode': 'mode',
- 'bond-num-grat-arp': 'gratuitious-arp',
- 'bond-primary': 'primary',
- 'bond-primary-reselect': 'primary-reselect-policy',
- 'bond-updelay': 'up-delay',
- 'bond-xmit-hash-policy': 'transmit-hash-policy'},
- 'bridge': {'bridge_ageing': 'ageing-time',
- 'bridge_bridgeprio': 'priority',
- 'bridge_fd': 'forward-delay',
- 'bridge_gcint': None,
- 'bridge_hello': 'hello-time',
- 'bridge_maxage': 'max-age',
- 'bridge_maxwait': None,
- 'bridge_pathcost': 'path-cost',
- 'bridge_portprio': 'port-priority',
- 'bridge_stp': 'stp',
- 'bridge_waitport': None}}
+ "bond": {
+ "bond-ad-select": "ad-select",
+ "bond-arp-interval": "arp-interval",
+ "bond-arp-ip-target": "arp-ip-target",
+ "bond-arp-validate": "arp-validate",
+ "bond-downdelay": "down-delay",
+ "bond-fail-over-mac": "fail-over-mac-policy",
+ "bond-lacp-rate": "lacp-rate",
+ "bond-miimon": "mii-monitor-interval",
+ "bond-min-links": "min-links",
+ "bond-mode": "mode",
+ "bond-num-grat-arp": "gratuitious-arp",
+ "bond-primary": "primary",
+ "bond-primary-reselect": "primary-reselect-policy",
+ "bond-updelay": "up-delay",
+ "bond-xmit-hash-policy": "transmit-hash-policy",
+ },
+ "bridge": {
+ "bridge_ageing": "ageing-time",
+ "bridge_bridgeprio": "priority",
+ "bridge_fd": "forward-delay",
+ "bridge_gcint": None,
+ "bridge_hello": "hello-time",
+ "bridge_maxage": "max-age",
+ "bridge_maxwait": None,
+ "bridge_pathcost": "path-cost",
+ "bridge_portprio": "port-priority",
+ "bridge_stp": "stp",
+ "bridge_waitport": None,
+ },
+}
def from_state_file(state_file):
@@ -77,17 +95,16 @@ class InvalidCommand(Exception):
def ensure_command_keys(required_keys):
-
def wrapper(func):
-
@functools.wraps(func)
def decorator(self, command, *args, **kwargs):
if required_keys:
missing_keys = diff_keys(required_keys, command)
if missing_keys:
- raise InvalidCommand("Command missing %s of required"
- " keys %s" % (missing_keys,
- required_keys))
+ raise InvalidCommand(
+ "Command missing %s of required keys %s"
+ % (missing_keys, required_keys)
+ )
return func(self, command, *args, **kwargs)
return decorator
@@ -102,29 +119,28 @@ class CommandHandlerMeta(type):
'handle_' and on finding those will populate a class attribute mapping
so that those methods can be quickly located and called.
"""
+
def __new__(cls, name, parents, dct):
command_handlers = {}
for attr_name, attr in dct.items():
- if callable(attr) and attr_name.startswith('handle_'):
- handles_what = attr_name[len('handle_'):]
+ if callable(attr) and attr_name.startswith("handle_"):
+ handles_what = attr_name[len("handle_") :]
if handles_what:
command_handlers[handles_what] = attr
- dct['command_handlers'] = command_handlers
- return super(CommandHandlerMeta, cls).__new__(cls, name,
- parents, dct)
+ dct["command_handlers"] = command_handlers
+ return super(CommandHandlerMeta, cls).__new__(cls, name, parents, dct)
class NetworkState(object):
-
def __init__(self, network_state, version=NETWORK_STATE_VERSION):
self._network_state = copy.deepcopy(network_state)
self._version = version
- self.use_ipv6 = network_state.get('use_ipv6', False)
+ self.use_ipv6 = network_state.get("use_ipv6", False)
self._has_default_route = None
@property
def config(self):
- return self._network_state['config']
+ return self._network_state["config"]
@property
def version(self):
@@ -133,14 +149,14 @@ class NetworkState(object):
@property
def dns_nameservers(self):
try:
- return self._network_state['dns']['nameservers']
+ return self._network_state["dns"]["nameservers"]
except KeyError:
return []
@property
def dns_searchdomains(self):
try:
- return self._network_state['dns']['search']
+ return self._network_state["dns"]["search"]
except KeyError:
return []
@@ -151,7 +167,7 @@ class NetworkState(object):
return self._has_default_route
def iter_interfaces(self, filter_func=None):
- ifaces = self._network_state.get('interfaces', {})
+ ifaces = self._network_state.get("interfaces", {})
for iface in ifaces.values():
if filter_func is None:
yield iface
@@ -160,7 +176,7 @@ class NetworkState(object):
yield iface
def iter_routes(self, filter_func=None):
- for route in self._network_state.get('routes', []):
+ for route in self._network_state.get("routes", []):
if filter_func is not None:
if filter_func(route):
yield route
@@ -172,38 +188,37 @@ class NetworkState(object):
if self._is_default_route(route):
return True
for iface in self.iter_interfaces():
- for subnet in iface.get('subnets', []):
- for route in subnet.get('routes', []):
+ for subnet in iface.get("subnets", []):
+ for route in subnet.get("routes", []):
if self._is_default_route(route):
return True
return False
def _is_default_route(self, route):
- default_nets = ('::', '0.0.0.0')
+ default_nets = ("::", "0.0.0.0")
return (
- route.get('prefix') == 0
- and route.get('network') in default_nets
+ route.get("prefix") == 0 and route.get("network") in default_nets
)
class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
initial_network_state = {
- 'interfaces': {},
- 'routes': [],
- 'dns': {
- 'nameservers': [],
- 'search': [],
+ "interfaces": {},
+ "routes": [],
+ "dns": {
+ "nameservers": [],
+ "search": [],
},
- 'use_ipv6': False,
- 'config': None,
+ "use_ipv6": False,
+ "config": None,
}
def __init__(self, version=NETWORK_STATE_VERSION, config=None):
self._version = version
self._config = config
self._network_state = copy.deepcopy(self.initial_network_state)
- self._network_state['config'] = config
+ self._network_state["config"] = config
self._parsed = False
self._interface_dns_map = {}
@@ -213,41 +228,41 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
@property
def use_ipv6(self):
- return self._network_state.get('use_ipv6')
+ return self._network_state.get("use_ipv6")
@use_ipv6.setter
def use_ipv6(self, val):
- self._network_state.update({'use_ipv6': val})
+ self._network_state.update({"use_ipv6": val})
def dump(self):
state = {
- 'version': self._version,
- 'config': self._config,
- 'network_state': self._network_state,
+ "version": self._version,
+ "config": self._config,
+ "network_state": self._network_state,
}
return safeyaml.dumps(state)
def load(self, state):
- if 'version' not in state:
- LOG.error('Invalid state, missing version field')
- raise ValueError('Invalid state, missing version field')
+ if "version" not in state:
+ LOG.error("Invalid state, missing version field")
+ raise ValueError("Invalid state, missing version field")
- required_keys = NETWORK_STATE_REQUIRED_KEYS[state['version']]
+ required_keys = NETWORK_STATE_REQUIRED_KEYS[state["version"]]
missing_keys = diff_keys(required_keys, state)
if missing_keys:
- msg = 'Invalid state, missing keys: %s' % (missing_keys)
+ msg = "Invalid state, missing keys: %s" % (missing_keys)
LOG.error(msg)
raise ValueError(msg)
# v1 - direct attr mapping, except version
- for key in [k for k in required_keys if k not in ['version']]:
+ for key in [k for k in required_keys if k not in ["version"]]:
setattr(self, key, state[key])
def dump_network_state(self):
return safeyaml.dumps(self._network_state)
def as_dict(self):
- return {'version': self._version, 'config': self._config}
+ return {"version": self._version, "config": self._config}
def get_network_state(self):
ns = self.network_state
@@ -263,7 +278,7 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
def parse_config_v1(self, skip_broken=True):
for command in self._config:
- command_type = command['type']
+ command_type = command["type"]
try:
handler = self.command_handlers[command_type]
except KeyError as e:
@@ -276,28 +291,29 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
if not skip_broken:
raise
else:
- LOG.warning("Skipping invalid command: %s", command,
- exc_info=True)
+ LOG.warning(
+ "Skipping invalid command: %s", command, exc_info=True
+ )
LOG.debug(self.dump_network_state())
for interface, dns in self._interface_dns_map.items():
iface = None
try:
- iface = self._network_state['interfaces'][interface]
+ iface = self._network_state["interfaces"][interface]
except KeyError as e:
raise ValueError(
- 'Nameserver specified for interface {0}, '
- 'but interface {0} does not exist!'.format(interface)
+ "Nameserver specified for interface {0}, "
+ "but interface {0} does not exist!".format(interface)
) from e
if iface:
nameservers, search = dns
- iface['dns'] = {
- 'addresses': nameservers,
- 'search': search,
+ iface["dns"] = {
+ "addresses": nameservers,
+ "search": search,
}
def parse_config_v2(self, skip_broken=True):
for command_type, command in self._config.items():
- if command_type in ['version', 'renderer']:
+ if command_type in ["version", "renderer"]:
continue
try:
handler = self.command_handlers[command_type]
@@ -312,17 +328,18 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
if not skip_broken:
raise
else:
- LOG.warning("Skipping invalid command: %s", command,
- exc_info=True)
+ LOG.warning(
+ "Skipping invalid command: %s", command, exc_info=True
+ )
LOG.debug(self.dump_network_state())
- @ensure_command_keys(['name'])
+ @ensure_command_keys(["name"])
def handle_loopback(self, command):
return self.handle_physical(command)
- @ensure_command_keys(['name'])
+ @ensure_command_keys(["name"])
def handle_physical(self, command):
- '''
+ """
command = {
'type': 'physical',
'mac_address': 'c0:d6:9f:2c:e8:80',
@@ -332,119 +349,122 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
],
'accept-ra': 'true'
}
- '''
+ """
- interfaces = self._network_state.get('interfaces', {})
- iface = interfaces.get(command['name'], {})
- for param, val in command.get('params', {}).items():
+ interfaces = self._network_state.get("interfaces", {})
+ iface = interfaces.get(command["name"], {})
+ for param, val in command.get("params", {}).items():
iface.update({param: val})
# convert subnet ipv6 netmask to cidr as needed
- subnets = _normalize_subnets(command.get('subnets'))
+ subnets = _normalize_subnets(command.get("subnets"))
# automatically set 'use_ipv6' if any addresses are ipv6
if not self.use_ipv6:
for subnet in subnets:
- if (subnet.get('type').endswith('6') or
- is_ipv6_addr(subnet.get('address'))):
+ if subnet.get("type").endswith("6") or is_ipv6_addr(
+ subnet.get("address")
+ ):
self.use_ipv6 = True
break
- accept_ra = command.get('accept-ra', None)
+ accept_ra = command.get("accept-ra", None)
if accept_ra is not None:
accept_ra = util.is_true(accept_ra)
- wakeonlan = command.get('wakeonlan', None)
+ wakeonlan = command.get("wakeonlan", None)
if wakeonlan is not None:
wakeonlan = util.is_true(wakeonlan)
- iface.update({
- 'name': command.get('name'),
- 'type': command.get('type'),
- 'mac_address': command.get('mac_address'),
- 'inet': 'inet',
- 'mode': 'manual',
- 'mtu': command.get('mtu'),
- 'address': None,
- 'gateway': None,
- 'subnets': subnets,
- 'accept-ra': accept_ra,
- 'wakeonlan': wakeonlan,
- })
- self._network_state['interfaces'].update({command.get('name'): iface})
+ iface.update(
+ {
+ "name": command.get("name"),
+ "type": command.get("type"),
+ "mac_address": command.get("mac_address"),
+ "inet": "inet",
+ "mode": "manual",
+ "mtu": command.get("mtu"),
+ "address": None,
+ "gateway": None,
+ "subnets": subnets,
+ "accept-ra": accept_ra,
+ "wakeonlan": wakeonlan,
+ }
+ )
+ self._network_state["interfaces"].update({command.get("name"): iface})
self.dump_network_state()
- @ensure_command_keys(['name', 'vlan_id', 'vlan_link'])
+ @ensure_command_keys(["name", "vlan_id", "vlan_link"])
def handle_vlan(self, command):
- '''
- auto eth0.222
- iface eth0.222 inet static
- address 10.10.10.1
- netmask 255.255.255.0
- hwaddress ether BC:76:4E:06:96:B3
- vlan-raw-device eth0
- '''
- interfaces = self._network_state.get('interfaces', {})
+ """
+ auto eth0.222
+ iface eth0.222 inet static
+ address 10.10.10.1
+ netmask 255.255.255.0
+ hwaddress ether BC:76:4E:06:96:B3
+ vlan-raw-device eth0
+ """
+ interfaces = self._network_state.get("interfaces", {})
self.handle_physical(command)
- iface = interfaces.get(command.get('name'), {})
- iface['vlan-raw-device'] = command.get('vlan_link')
- iface['vlan_id'] = command.get('vlan_id')
- interfaces.update({iface['name']: iface})
+ iface = interfaces.get(command.get("name"), {})
+ iface["vlan-raw-device"] = command.get("vlan_link")
+ iface["vlan_id"] = command.get("vlan_id")
+ interfaces.update({iface["name"]: iface})
- @ensure_command_keys(['name', 'bond_interfaces', 'params'])
+ @ensure_command_keys(["name", "bond_interfaces", "params"])
def handle_bond(self, command):
- '''
- #/etc/network/interfaces
- auto eth0
- iface eth0 inet manual
- bond-master bond0
- bond-mode 802.3ad
-
- auto eth1
- iface eth1 inet manual
- bond-master bond0
- bond-mode 802.3ad
-
- auto bond0
- iface bond0 inet static
- address 192.168.0.10
- gateway 192.168.0.1
- netmask 255.255.255.0
- bond-slaves none
- bond-mode 802.3ad
- bond-miimon 100
- bond-downdelay 200
- bond-updelay 200
- bond-lacp-rate 4
- '''
+ """
+ #/etc/network/interfaces
+ auto eth0
+ iface eth0 inet manual
+ bond-master bond0
+ bond-mode 802.3ad
+
+ auto eth1
+ iface eth1 inet manual
+ bond-master bond0
+ bond-mode 802.3ad
+
+ auto bond0
+ iface bond0 inet static
+ address 192.168.0.10
+ gateway 192.168.0.1
+ netmask 255.255.255.0
+ bond-slaves none
+ bond-mode 802.3ad
+ bond-miimon 100
+ bond-downdelay 200
+ bond-updelay 200
+ bond-lacp-rate 4
+ """
self.handle_physical(command)
- interfaces = self._network_state.get('interfaces')
- iface = interfaces.get(command.get('name'), {})
- for param, val in command.get('params').items():
+ interfaces = self._network_state.get("interfaces")
+ iface = interfaces.get(command.get("name"), {})
+ for param, val in command.get("params").items():
iface.update({param: val})
- iface.update({'bond-slaves': 'none'})
- self._network_state['interfaces'].update({iface['name']: iface})
+ iface.update({"bond-slaves": "none"})
+ self._network_state["interfaces"].update({iface["name"]: iface})
# handle bond slaves
- for ifname in command.get('bond_interfaces'):
+ for ifname in command.get("bond_interfaces"):
if ifname not in interfaces:
cmd = {
- 'name': ifname,
- 'type': 'bond',
+ "name": ifname,
+ "type": "bond",
}
# inject placeholder
self.handle_physical(cmd)
- interfaces = self._network_state.get('interfaces', {})
+ interfaces = self._network_state.get("interfaces", {})
bond_if = interfaces.get(ifname)
- bond_if['bond-master'] = command.get('name')
+ bond_if["bond-master"] = command.get("name")
# copy in bond config into slave
- for param, val in command.get('params').items():
+ for param, val in command.get("params").items():
bond_if.update({param: val})
- self._network_state['interfaces'].update({ifname: bond_if})
+ self._network_state["interfaces"].update({ifname: bond_if})
- @ensure_command_keys(['name', 'bridge_interfaces'])
+ @ensure_command_keys(["name", "bridge_interfaces"])
def handle_bridge(self, command):
- '''
+ """
auto br0
iface br0 inet static
address 10.10.10.1
@@ -469,89 +489,91 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
"bridge_stp",
"bridge_waitport",
]
- '''
+ """
# find one of the bridge port ifaces to get mac_addr
# handle bridge_slaves
- interfaces = self._network_state.get('interfaces', {})
- for ifname in command.get('bridge_interfaces'):
+ interfaces = self._network_state.get("interfaces", {})
+ for ifname in command.get("bridge_interfaces"):
if ifname in interfaces:
continue
cmd = {
- 'name': ifname,
+ "name": ifname,
}
# inject placeholder
self.handle_physical(cmd)
- interfaces = self._network_state.get('interfaces', {})
+ interfaces = self._network_state.get("interfaces", {})
self.handle_physical(command)
- iface = interfaces.get(command.get('name'), {})
- iface['bridge_ports'] = command['bridge_interfaces']
- for param, val in command.get('params', {}).items():
+ iface = interfaces.get(command.get("name"), {})
+ iface["bridge_ports"] = command["bridge_interfaces"]
+ for param, val in command.get("params", {}).items():
iface.update({param: val})
# convert value to boolean
- bridge_stp = iface.get('bridge_stp')
+ bridge_stp = iface.get("bridge_stp")
if bridge_stp is not None and type(bridge_stp) != bool:
- if bridge_stp in ['on', '1', 1]:
+ if bridge_stp in ["on", "1", 1]:
bridge_stp = True
- elif bridge_stp in ['off', '0', 0]:
+ elif bridge_stp in ["off", "0", 0]:
bridge_stp = False
else:
raise ValueError(
- 'Cannot convert bridge_stp value ({stp}) to'
- ' boolean'.format(stp=bridge_stp))
- iface.update({'bridge_stp': bridge_stp})
+ "Cannot convert bridge_stp value ({stp}) to"
+ " boolean".format(stp=bridge_stp)
+ )
+ iface.update({"bridge_stp": bridge_stp})
- interfaces.update({iface['name']: iface})
+ interfaces.update({iface["name"]: iface})
- @ensure_command_keys(['name'])
+ @ensure_command_keys(["name"])
def handle_infiniband(self, command):
self.handle_physical(command)
def _parse_dns(self, command):
nameservers = []
search = []
- if 'address' in command:
- addrs = command['address']
+ if "address" in command:
+ addrs = command["address"]
if not type(addrs) == list:
addrs = [addrs]
for addr in addrs:
nameservers.append(addr)
- if 'search' in command:
- paths = command['search']
+ if "search" in command:
+ paths = command["search"]
if not isinstance(paths, list):
paths = [paths]
for path in paths:
search.append(path)
return nameservers, search
- @ensure_command_keys(['address'])
+ @ensure_command_keys(["address"])
def handle_nameserver(self, command):
- dns = self._network_state.get('dns')
+ dns = self._network_state.get("dns")
nameservers, search = self._parse_dns(command)
- if 'interface' in command:
- self._interface_dns_map[command['interface']] = (
- nameservers, search
+ if "interface" in command:
+ self._interface_dns_map[command["interface"]] = (
+ nameservers,
+ search,
)
else:
- dns['nameservers'].extend(nameservers)
- dns['search'].extend(search)
+ dns["nameservers"].extend(nameservers)
+ dns["search"].extend(search)
- @ensure_command_keys(['address'])
+ @ensure_command_keys(["address"])
def _handle_individual_nameserver(self, command, iface):
- _iface = self._network_state.get('interfaces')
+ _iface = self._network_state.get("interfaces")
nameservers, search = self._parse_dns(command)
- _iface[iface]['dns'] = {'nameservers': nameservers, 'search': search}
+ _iface[iface]["dns"] = {"nameservers": nameservers, "search": search}
- @ensure_command_keys(['destination'])
+ @ensure_command_keys(["destination"])
def handle_route(self, command):
- self._network_state['routes'].append(_normalize_route(command))
+ self._network_state["routes"].append(_normalize_route(command))
# V2 handlers
def handle_bonds(self, command):
- '''
+ """
v2_command = {
bond0: {
'interfaces': ['interface0', 'interface1'],
@@ -578,12 +600,12 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
}
}
- '''
- self._handle_bond_bridge(command, cmd_type='bond')
+ """
+ self._handle_bond_bridge(command, cmd_type="bond")
def handle_bridges(self, command):
- '''
+ """
v2_command = {
br0: {
'interfaces': ['interface0', 'interface1'],
@@ -604,11 +626,11 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
}
}
- '''
- self._handle_bond_bridge(command, cmd_type='bridge')
+ """
+ self._handle_bond_bridge(command, cmd_type="bridge")
def handle_ethernets(self, command):
- '''
+ """
ethernets:
eno1:
match:
@@ -644,34 +666,38 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
{'type': 'dhcp4'}
]
}
- '''
+ """
for eth, cfg in command.items():
phy_cmd = {
- 'type': 'physical',
- 'name': cfg.get('set-name', eth),
+ "type": "physical",
+ "name": cfg.get("set-name", eth),
}
- match = cfg.get('match', {})
- mac_address = match.get('macaddress', None)
+ match = cfg.get("match", {})
+ mac_address = match.get("macaddress", None)
if not mac_address:
- LOG.debug('NetworkState Version2: missing "macaddress" info '
- 'in config entry: %s: %s', eth, str(cfg))
- phy_cmd['mac_address'] = mac_address
- driver = match.get('driver', None)
+ LOG.debug(
+ 'NetworkState Version2: missing "macaddress" info '
+ "in config entry: %s: %s",
+ eth,
+ str(cfg),
+ )
+ phy_cmd["mac_address"] = mac_address
+ driver = match.get("driver", None)
if driver:
- phy_cmd['params'] = {'driver': driver}
- for key in ['mtu', 'match', 'wakeonlan', 'accept-ra']:
+ phy_cmd["params"] = {"driver": driver}
+ for key in ["mtu", "match", "wakeonlan", "accept-ra"]:
if key in cfg:
phy_cmd[key] = cfg[key]
subnets = self._v2_to_v1_ipcfg(cfg)
if len(subnets) > 0:
- phy_cmd.update({'subnets': subnets})
+ phy_cmd.update({"subnets": subnets})
- LOG.debug('v2(ethernets) -> v1(physical):\n%s', phy_cmd)
+ LOG.debug("v2(ethernets) -> v1(physical):\n%s", phy_cmd)
self.handle_physical(phy_cmd)
def handle_vlans(self, command):
- '''
+ """
v2_vlans = {
'eth0.123': {
'id': 123,
@@ -687,41 +713,43 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
'vlan_id': 123,
'subnets': [{'type': 'dhcp4'}],
}
- '''
+ """
for vlan, cfg in command.items():
vlan_cmd = {
- 'type': 'vlan',
- 'name': vlan,
- 'vlan_id': cfg.get('id'),
- 'vlan_link': cfg.get('link'),
+ "type": "vlan",
+ "name": vlan,
+ "vlan_id": cfg.get("id"),
+ "vlan_link": cfg.get("link"),
}
- if 'mtu' in cfg:
- vlan_cmd['mtu'] = cfg['mtu']
+ if "mtu" in cfg:
+ vlan_cmd["mtu"] = cfg["mtu"]
subnets = self._v2_to_v1_ipcfg(cfg)
if len(subnets) > 0:
- vlan_cmd.update({'subnets': subnets})
- LOG.debug('v2(vlans) -> v1(vlan):\n%s', vlan_cmd)
+ vlan_cmd.update({"subnets": subnets})
+ LOG.debug("v2(vlans) -> v1(vlan):\n%s", vlan_cmd)
self.handle_vlan(vlan_cmd)
def handle_wifis(self, command):
- LOG.warning('Wifi configuration is only available to distros with'
- ' netplan rendering support.')
+ LOG.warning(
+ "Wifi configuration is only available to distros with"
+ " netplan rendering support."
+ )
def _v2_common(self, cfg):
- LOG.debug('v2_common: handling config:\n%s', cfg)
+ LOG.debug("v2_common: handling config:\n%s", cfg)
for iface, dev_cfg in cfg.items():
- if 'set-name' in dev_cfg:
- set_name_iface = dev_cfg.get('set-name')
+ if "set-name" in dev_cfg:
+ set_name_iface = dev_cfg.get("set-name")
if set_name_iface:
iface = set_name_iface
- if 'nameservers' in dev_cfg:
- search = dev_cfg.get('nameservers').get('search', [])
- dns = dev_cfg.get('nameservers').get('addresses', [])
- name_cmd = {'type': 'nameserver'}
+ if "nameservers" in dev_cfg:
+ search = dev_cfg.get("nameservers").get("search", [])
+ dns = dev_cfg.get("nameservers").get("addresses", [])
+ name_cmd = {"type": "nameserver"}
if len(search) > 0:
- name_cmd.update({'search': search})
+ name_cmd.update({"search": search})
if len(dns) > 0:
- name_cmd.update({'address': dns})
+ name_cmd.update({"address": dns})
self.handle_nameserver(name_cmd)
self._handle_individual_nameserver(name_cmd, iface)
@@ -729,98 +757,110 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
"""Common handler for bond and bridge types"""
# inverse mapping for v2 keynames to v1 keynames
- v2key_to_v1 = dict((v, k) for k, v in
- NET_CONFIG_TO_V2.get(cmd_type).items())
+ v2key_to_v1 = dict(
+ (v, k) for k, v in NET_CONFIG_TO_V2.get(cmd_type).items()
+ )
for item_name, item_cfg in command.items():
- item_params = dict((key, value) for (key, value) in
- item_cfg.items() if key not in
- NETWORK_V2_KEY_FILTER)
+ item_params = dict(
+ (key, value)
+ for (key, value) in item_cfg.items()
+ if key not in NETWORK_V2_KEY_FILTER
+ )
# we accept the fixed spelling, but write the old for compatibility
# Xenial does not have an updated netplan which supports the
# correct spelling. LP: #1756701
- params = item_params.get('parameters', {})
- grat_value = params.pop('gratuitous-arp', None)
+ params = item_params.get("parameters", {})
+ grat_value = params.pop("gratuitous-arp", None)
if grat_value:
- params['gratuitious-arp'] = grat_value
+ params["gratuitious-arp"] = grat_value
v1_cmd = {
- 'type': cmd_type,
- 'name': item_name,
- cmd_type + '_interfaces': item_cfg.get('interfaces'),
- 'params': dict((v2key_to_v1[k], v) for k, v in params.items())
+ "type": cmd_type,
+ "name": item_name,
+ cmd_type + "_interfaces": item_cfg.get("interfaces"),
+ "params": dict((v2key_to_v1[k], v) for k, v in params.items()),
}
- if 'mtu' in item_cfg:
- v1_cmd['mtu'] = item_cfg['mtu']
+ if "mtu" in item_cfg:
+ v1_cmd["mtu"] = item_cfg["mtu"]
subnets = self._v2_to_v1_ipcfg(item_cfg)
if len(subnets) > 0:
- v1_cmd.update({'subnets': subnets})
+ v1_cmd.update({"subnets": subnets})
- LOG.debug('v2(%s) -> v1(%s):\n%s', cmd_type, cmd_type, v1_cmd)
+ LOG.debug("v2(%s) -> v1(%s):\n%s", cmd_type, cmd_type, v1_cmd)
if cmd_type == "bridge":
self.handle_bridge(v1_cmd)
elif cmd_type == "bond":
self.handle_bond(v1_cmd)
else:
- raise ValueError('Unknown command type: {cmd_type}'.format(
- cmd_type=cmd_type))
+ raise ValueError(
+ "Unknown command type: {cmd_type}".format(
+ cmd_type=cmd_type
+ )
+ )
def _v2_to_v1_ipcfg(self, cfg):
"""Common ipconfig extraction from v2 to v1 subnets array."""
def _add_dhcp_overrides(overrides, subnet):
- if 'route-metric' in overrides:
- subnet['metric'] = overrides['route-metric']
+ if "route-metric" in overrides:
+ subnet["metric"] = overrides["route-metric"]
subnets = []
- if cfg.get('dhcp4'):
- subnet = {'type': 'dhcp4'}
- _add_dhcp_overrides(cfg.get('dhcp4-overrides', {}), subnet)
+ if cfg.get("dhcp4"):
+ subnet = {"type": "dhcp4"}
+ _add_dhcp_overrides(cfg.get("dhcp4-overrides", {}), subnet)
subnets.append(subnet)
- if cfg.get('dhcp6'):
- subnet = {'type': 'dhcp6'}
+ if cfg.get("dhcp6"):
+ subnet = {"type": "dhcp6"}
self.use_ipv6 = True
- _add_dhcp_overrides(cfg.get('dhcp6-overrides', {}), subnet)
+ _add_dhcp_overrides(cfg.get("dhcp6-overrides", {}), subnet)
subnets.append(subnet)
gateway4 = None
gateway6 = None
nameservers = {}
- for address in cfg.get('addresses', []):
+ for address in cfg.get("addresses", []):
subnet = {
- 'type': 'static',
- 'address': address,
+ "type": "static",
+ "address": address,
}
if ":" in address:
- if 'gateway6' in cfg and gateway6 is None:
- gateway6 = cfg.get('gateway6')
- subnet.update({'gateway': gateway6})
+ if "gateway6" in cfg and gateway6 is None:
+ gateway6 = cfg.get("gateway6")
+ subnet.update({"gateway": gateway6})
else:
- if 'gateway4' in cfg and gateway4 is None:
- gateway4 = cfg.get('gateway4')
- subnet.update({'gateway': gateway4})
+ if "gateway4" in cfg and gateway4 is None:
+ gateway4 = cfg.get("gateway4")
+ subnet.update({"gateway": gateway4})
- if 'nameservers' in cfg and not nameservers:
- addresses = cfg.get('nameservers').get('addresses')
+ if "nameservers" in cfg and not nameservers:
+ addresses = cfg.get("nameservers").get("addresses")
if addresses:
- nameservers['dns_nameservers'] = addresses
- search = cfg.get('nameservers').get('search')
+ nameservers["dns_nameservers"] = addresses
+ search = cfg.get("nameservers").get("search")
if search:
- nameservers['dns_search'] = search
+ nameservers["dns_search"] = search
subnet.update(nameservers)
subnets.append(subnet)
routes = []
- for route in cfg.get('routes', []):
- routes.append(_normalize_route(
- {'destination': route.get('to'), 'gateway': route.get('via')}))
+ for route in cfg.get("routes", []):
+ routes.append(
+ _normalize_route(
+ {
+ "destination": route.get("to"),
+ "gateway": route.get("via"),
+ }
+ )
+ )
# v2 routes are bound to the interface, in v1 we add them under
# the first subnet since there isn't an equivalent interface level.
if len(subnets) and len(routes):
- subnets[0]['routes'] = routes
+ subnets[0]["routes"] = routes
return subnets
@@ -830,18 +870,25 @@ def _normalize_subnet(subnet):
subnet = copy.deepcopy(subnet)
normal_subnet = dict((k, v) for k, v in subnet.items() if v)
- if subnet.get('type') in ('static', 'static6'):
+ if subnet.get("type") in ("static", "static6"):
normal_subnet.update(
- _normalize_net_keys(normal_subnet, address_keys=(
- 'address', 'ip_address',)))
- normal_subnet['routes'] = [_normalize_route(r)
- for r in subnet.get('routes', [])]
+ _normalize_net_keys(
+ normal_subnet,
+ address_keys=(
+ "address",
+ "ip_address",
+ ),
+ )
+ )
+ normal_subnet["routes"] = [
+ _normalize_route(r) for r in subnet.get("routes", [])
+ ]
def listify(snet, name):
if name in snet and not isinstance(snet[name], list):
snet[name] = snet[name].split()
- for k in ('dns_search', 'dns_nameservers'):
+ for k in ("dns_search", "dns_nameservers"):
listify(normal_subnet, k)
return normal_subnet
@@ -865,15 +912,16 @@ def _normalize_net_keys(network, address_keys=()):
addr_key = key
break
if not addr_key:
- message = (
- 'No config network address keys [%s] found in %s' %
- (','.join(address_keys), network))
+ message = "No config network address keys [%s] found in %s" % (
+ ",".join(address_keys),
+ network,
+ )
LOG.error(message)
raise ValueError(message)
addr = net.get(addr_key)
ipv6 = is_ipv6_addr(addr)
- netmask = net.get('netmask')
+ netmask = net.get("netmask")
if "/" in addr:
addr_part, _, maybe_prefix = addr.partition("/")
net[addr_key] = addr_part
@@ -884,23 +932,26 @@ def _normalize_net_keys(network, address_keys=()):
prefix = mask_to_net_prefix(maybe_prefix)
elif netmask:
prefix = mask_to_net_prefix(netmask)
- elif 'prefix' in net:
- prefix = int(net['prefix'])
+ elif "prefix" in net:
+ prefix = int(net["prefix"])
else:
prefix = 64 if ipv6 else 24
- if 'prefix' in net and str(net['prefix']) != str(prefix):
- LOG.warning("Overwriting existing 'prefix' with '%s' in "
- "network info: %s", prefix, net)
- net['prefix'] = prefix
+ if "prefix" in net and str(net["prefix"]) != str(prefix):
+ LOG.warning(
+ "Overwriting existing 'prefix' with '%s' in network info: %s",
+ prefix,
+ net,
+ )
+ net["prefix"] = prefix
if ipv6:
# TODO: we could/maybe should add this back with the very uncommon
# 'netmask' for ipv6. We need a 'net_prefix_to_ipv6_mask' for that.
- if 'netmask' in net:
- del net['netmask']
+ if "netmask" in net:
+ del net["netmask"]
else:
- net['netmask'] = net_prefix_to_ipv4_mask(net['prefix'])
+ net["netmask"] = net_prefix_to_ipv4_mask(net["prefix"])
return net
@@ -913,25 +964,28 @@ def _normalize_route(route):
'prefix': the network prefix for address as an integer.
'metric': integer metric (only if present in input).
'netmask': netmask (string) equivalent to prefix iff network is ipv4.
- """
+ """
# Prune None-value keys. Specifically allow 0 (a valid metric).
- normal_route = dict((k, v) for k, v in route.items()
- if v not in ("", None))
- if 'destination' in normal_route:
- normal_route['network'] = normal_route['destination']
- del normal_route['destination']
+ normal_route = dict(
+ (k, v) for k, v in route.items() if v not in ("", None)
+ )
+ if "destination" in normal_route:
+ normal_route["network"] = normal_route["destination"]
+ del normal_route["destination"]
normal_route.update(
_normalize_net_keys(
- normal_route, address_keys=('network', 'destination')))
+ normal_route, address_keys=("network", "destination")
+ )
+ )
- metric = normal_route.get('metric')
+ metric = normal_route.get("metric")
if metric:
try:
- normal_route['metric'] = int(metric)
+ normal_route["metric"] = int(metric)
except ValueError as e:
raise TypeError(
- 'Route config metric {} is not an integer'.format(metric)
+ "Route config metric {} is not an integer".format(metric)
) from e
return normal_route
@@ -952,10 +1006,10 @@ def subnet_is_ipv6(subnet):
"""Common helper for checking network_state subnets for ipv6."""
# 'static6', 'dhcp6', 'ipv6_dhcpv6-stateful', 'ipv6_dhcpv6-stateless' or
# 'ipv6_slaac'
- if subnet['type'].endswith('6') or subnet['type'] in IPV6_DYNAMIC_TYPES:
+ if subnet["type"].endswith("6") or subnet["type"] in IPV6_DYNAMIC_TYPES:
# This is a request either static6 type or DHCPv6.
return True
- elif subnet['type'] == 'static' and is_ipv6_addr(subnet.get('address')):
+ elif subnet["type"] == "static" and is_ipv6_addr(subnet.get("address")):
return True
return False
@@ -967,7 +1021,8 @@ def net_prefix_to_ipv4_mask(prefix):
24 -> "255.255.255.0"
Also supports input as a string."""
mask = socket.inet_ntoa(
- struct.pack(">I", (0xffffffff << (32 - int(prefix)) & 0xffffffff)))
+ struct.pack(">I", (0xFFFFFFFF << (32 - int(prefix)) & 0xFFFFFFFF))
+ )
return mask
@@ -990,14 +1045,14 @@ def ipv4_mask_to_net_prefix(mask):
else:
raise TypeError("mask '%s' is not a string or int")
- if '.' not in mask:
+ if "." not in mask:
raise ValueError("netmask '%s' does not contain a '.'" % mask)
toks = mask.split(".")
if len(toks) != 4:
raise ValueError("netmask '%s' had only %d parts" % (mask, len(toks)))
- return sum([bin(int(x)).count('1') for x in toks])
+ return sum([bin(int(x)).count("1") for x in toks])
def ipv6_mask_to_net_prefix(mask):
@@ -1017,14 +1072,30 @@ def ipv6_mask_to_net_prefix(mask):
else:
raise TypeError("mask '%s' is not a string or int")
- if ':' not in mask:
+ if ":" not in mask:
raise ValueError("mask '%s' does not have a ':'")
- bitCount = [0, 0x8000, 0xc000, 0xe000, 0xf000, 0xf800, 0xfc00, 0xfe00,
- 0xff00, 0xff80, 0xffc0, 0xffe0, 0xfff0, 0xfff8, 0xfffc,
- 0xfffe, 0xffff]
+ bitCount = [
+ 0,
+ 0x8000,
+ 0xC000,
+ 0xE000,
+ 0xF000,
+ 0xF800,
+ 0xFC00,
+ 0xFE00,
+ 0xFF00,
+ 0xFF80,
+ 0xFFC0,
+ 0xFFE0,
+ 0xFFF0,
+ 0xFFF8,
+ 0xFFFC,
+ 0xFFFE,
+ 0xFFFF,
+ ]
prefix = 0
- for word in mask.split(':'):
+ for word in mask.split(":"):
if not word or int(word, 16) == 0:
break
prefix += bitCount.index(int(word, 16))
@@ -1052,11 +1123,12 @@ def mask_and_ipv4_to_bcast_addr(mask, ip):
"""Calculate the broadcast address from the subnet mask and ip addr.
Supports ipv4 only."""
- ip_bin = int(''.join([bin(int(x) + 256)[3:] for x in ip.split('.')]), 2)
+ ip_bin = int("".join([bin(int(x) + 256)[3:] for x in ip.split(".")]), 2)
mask_dec = ipv4_mask_to_net_prefix(mask)
- bcast_bin = ip_bin | (2**(32 - mask_dec) - 1)
- bcast_str = '.'.join([str(bcast_bin >> (i << 3) & 0xFF)
- for i in range(4)[::-1]])
+ bcast_bin = ip_bin | (2 ** (32 - mask_dec) - 1)
+ bcast_str = ".".join(
+ [str(bcast_bin >> (i << 3) & 0xFF) for i in range(4)[::-1]]
+ )
return bcast_str
@@ -1066,8 +1138,8 @@ def parse_net_config_data(net_config, skip_broken=True) -> NetworkState:
:param net_config: curtin network config dict
"""
state = None
- version = net_config.get('version')
- config = net_config.get('config')
+ version = net_config.get("version")
+ config = net_config.get("config")
if version == 2:
# v2 does not have explicit 'config' key so we
# pass the whole net-config as-is
diff --git a/cloudinit/net/networkd.py b/cloudinit/net/networkd.py
index c97c18f6..3bbeb284 100644
--- a/cloudinit/net/networkd.py
+++ b/cloudinit/net/networkd.py
@@ -8,56 +8,57 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+from collections import OrderedDict
+from cloudinit import log as logging
+from cloudinit import subp, util
from . import renderer
-from cloudinit import util
-from cloudinit import subp
-from cloudinit import log as logging
-from collections import OrderedDict
LOG = logging.getLogger(__name__)
class CfgParser:
def __init__(self):
- self.conf_dict = OrderedDict({
- 'Match': [],
- 'Link': [],
- 'Network': [],
- 'DHCPv4': [],
- 'DHCPv6': [],
- 'Address': [],
- 'Route': [],
- })
+ self.conf_dict = OrderedDict(
+ {
+ "Match": [],
+ "Link": [],
+ "Network": [],
+ "DHCPv4": [],
+ "DHCPv6": [],
+ "Address": [],
+ "Route": [],
+ }
+ )
def update_section(self, sec, key, val):
for k in self.conf_dict.keys():
if k == sec:
- self.conf_dict[k].append(key+'='+str(val))
+ self.conf_dict[k].append(key + "=" + str(val))
# remove duplicates from list
self.conf_dict[k] = list(dict.fromkeys(self.conf_dict[k]))
self.conf_dict[k].sort()
def get_final_conf(self):
- contents = ''
+ contents = ""
for k, v in sorted(self.conf_dict.items()):
if not v:
continue
- contents += '['+k+']\n'
+ contents += "[" + k + "]\n"
for e in sorted(v):
- contents += e + '\n'
- contents += '\n'
+ contents += e + "\n"
+ contents += "\n"
return contents
def dump_data(self, target_fn):
if not target_fn:
- LOG.warning('Target file not given')
+ LOG.warning("Target file not given")
return
contents = self.get_final_conf()
- LOG.debug('Final content: %s', contents)
+ LOG.debug("Final content: %s", contents)
util.write_file(target_fn, contents)
@@ -72,17 +73,19 @@ class Renderer(renderer.Renderer):
def __init__(self, config=None):
if not config:
config = {}
- self.resolve_conf_fn = config.get('resolve_conf_fn',
- '/etc/systemd/resolved.conf')
- self.network_conf_dir = config.get('network_conf_dir',
- '/etc/systemd/network/')
+ self.resolve_conf_fn = config.get(
+ "resolve_conf_fn", "/etc/systemd/resolved.conf"
+ )
+ self.network_conf_dir = config.get(
+ "network_conf_dir", "/etc/systemd/network/"
+ )
def generate_match_section(self, iface, cfg):
- sec = 'Match'
+ sec = "Match"
match_dict = {
- 'name': 'Name',
- 'driver': 'Driver',
- 'mac_address': 'MACAddress'
+ "name": "Name",
+ "driver": "Driver",
+ "mac_address": "MACAddress",
}
if not iface:
@@ -92,125 +95,126 @@ class Renderer(renderer.Renderer):
if k in iface and iface[k]:
cfg.update_section(sec, v, iface[k])
- return iface['name']
+ return iface["name"]
def generate_link_section(self, iface, cfg):
- sec = 'Link'
+ sec = "Link"
if not iface:
return
- if 'mtu' in iface and iface['mtu']:
- cfg.update_section(sec, 'MTUBytes', iface['mtu'])
+ if "mtu" in iface and iface["mtu"]:
+ cfg.update_section(sec, "MTUBytes", iface["mtu"])
def parse_routes(self, conf, cfg):
- sec = 'Route'
+ sec = "Route"
route_cfg_map = {
- 'gateway': 'Gateway',
- 'network': 'Destination',
- 'metric': 'Metric',
+ "gateway": "Gateway",
+ "network": "Destination",
+ "metric": "Metric",
}
# prefix is derived using netmask by network_state
- prefix = ''
- if 'prefix' in conf:
- prefix = '/' + str(conf['prefix'])
+ prefix = ""
+ if "prefix" in conf:
+ prefix = "/" + str(conf["prefix"])
for k, v in conf.items():
if k not in route_cfg_map:
continue
- if k == 'network':
+ if k == "network":
v += prefix
cfg.update_section(sec, route_cfg_map[k], v)
def parse_subnets(self, iface, cfg):
- dhcp = 'no'
- sec = 'Network'
- for e in iface.get('subnets', []):
- t = e['type']
- if t == 'dhcp4' or t == 'dhcp':
- if dhcp == 'no':
- dhcp = 'ipv4'
- elif dhcp == 'ipv6':
- dhcp = 'yes'
- elif t == 'dhcp6':
- if dhcp == 'no':
- dhcp = 'ipv6'
- elif dhcp == 'ipv4':
- dhcp = 'yes'
- if 'routes' in e and e['routes']:
- for i in e['routes']:
+ dhcp = "no"
+ sec = "Network"
+ for e in iface.get("subnets", []):
+ t = e["type"]
+ if t == "dhcp4" or t == "dhcp":
+ if dhcp == "no":
+ dhcp = "ipv4"
+ elif dhcp == "ipv6":
+ dhcp = "yes"
+ elif t == "dhcp6":
+ if dhcp == "no":
+ dhcp = "ipv6"
+ elif dhcp == "ipv4":
+ dhcp = "yes"
+ if "routes" in e and e["routes"]:
+ for i in e["routes"]:
self.parse_routes(i, cfg)
- if 'address' in e:
+ if "address" in e:
subnet_cfg_map = {
- 'address': 'Address',
- 'gateway': 'Gateway',
- 'dns_nameservers': 'DNS',
- 'dns_search': 'Domains',
+ "address": "Address",
+ "gateway": "Gateway",
+ "dns_nameservers": "DNS",
+ "dns_search": "Domains",
}
for k, v in e.items():
- if k == 'address':
- if 'prefix' in e:
- v += '/' + str(e['prefix'])
- cfg.update_section('Address', subnet_cfg_map[k], v)
- elif k == 'gateway':
- cfg.update_section('Route', subnet_cfg_map[k], v)
- elif k == 'dns_nameservers' or k == 'dns_search':
- cfg.update_section(sec, subnet_cfg_map[k], ' '.join(v))
-
- cfg.update_section(sec, 'DHCP', dhcp)
-
- if (dhcp in ['ipv6', 'yes'] and
- isinstance(iface.get('accept-ra', ''), bool)):
- cfg.update_section(sec, 'IPv6AcceptRA', iface['accept-ra'])
+ if k == "address":
+ if "prefix" in e:
+ v += "/" + str(e["prefix"])
+ cfg.update_section("Address", subnet_cfg_map[k], v)
+ elif k == "gateway":
+ cfg.update_section("Route", subnet_cfg_map[k], v)
+ elif k == "dns_nameservers" or k == "dns_search":
+ cfg.update_section(sec, subnet_cfg_map[k], " ".join(v))
+
+ cfg.update_section(sec, "DHCP", dhcp)
+
+ if dhcp in ["ipv6", "yes"] and isinstance(
+ iface.get("accept-ra", ""), bool
+ ):
+ cfg.update_section(sec, "IPv6AcceptRA", iface["accept-ra"])
# This is to accommodate extra keys present in VMware config
def dhcp_domain(self, d, cfg):
- for item in ['dhcp4domain', 'dhcp6domain']:
+ for item in ["dhcp4domain", "dhcp6domain"]:
if item not in d:
continue
ret = str(d[item]).casefold()
try:
ret = util.translate_bool(ret)
- ret = 'yes' if ret else 'no'
+ ret = "yes" if ret else "no"
except ValueError:
- if ret != 'route':
- LOG.warning('Invalid dhcp4domain value - %s', ret)
- ret = 'no'
- if item == 'dhcp4domain':
- section = 'DHCPv4'
+ if ret != "route":
+ LOG.warning("Invalid dhcp4domain value - %s", ret)
+ ret = "no"
+ if item == "dhcp4domain":
+ section = "DHCPv4"
else:
- section = 'DHCPv6'
- cfg.update_section(section, 'UseDomains', ret)
+ section = "DHCPv6"
+ cfg.update_section(section, "UseDomains", ret)
def parse_dns(self, iface, cfg, ns):
- sec = 'Network'
+ sec = "Network"
dns_cfg_map = {
- 'search': 'Domains',
- 'nameservers': 'DNS',
- 'addresses': 'DNS',
+ "search": "Domains",
+ "nameservers": "DNS",
+ "addresses": "DNS",
}
- dns = iface.get('dns')
+ dns = iface.get("dns")
if not dns and ns.version == 1:
dns = {
- 'search': ns.dns_searchdomains,
- 'nameservers': ns.dns_nameservers,
+ "search": ns.dns_searchdomains,
+ "nameservers": ns.dns_nameservers,
}
elif not dns and ns.version == 2:
return
for k, v in dns_cfg_map.items():
if k in dns and dns[k]:
- cfg.update_section(sec, v, ' '.join(dns[k]))
+ cfg.update_section(sec, v, " ".join(dns[k]))
def create_network_file(self, link, conf, nwk_dir):
- net_fn_owner = 'systemd-network'
+ net_fn_owner = "systemd-network"
- LOG.debug('Setting Networking Config for %s', link)
+ LOG.debug("Setting Networking Config for %s", link)
- net_fn = nwk_dir + '10-cloud-init-' + link + '.network'
+ net_fn = nwk_dir + "10-cloud-init-" + link + ".network"
util.write_file(net_fn, conf)
util.chownbyname(net_fn, net_fn_owner, net_fn_owner)
@@ -239,7 +243,7 @@ class Renderer(renderer.Renderer):
self.parse_routes(route, cfg)
if ns.version == 2:
- name = iface['name']
+ name = iface["name"]
# network state doesn't give dhcp domain info
# using ns.config as a workaround here
@@ -249,13 +253,13 @@ class Renderer(renderer.Renderer):
# set-name value that matches the current name, then update the
# current name to the device's name. That will be the value in
# the ns.config['ethernets'] dict below.
- for dev_name, dev_cfg in ns.config['ethernets'].items():
- if 'set-name' in dev_cfg:
- if dev_cfg.get('set-name') == name:
+ for dev_name, dev_cfg in ns.config["ethernets"].items():
+ if "set-name" in dev_cfg:
+ if dev_cfg.get("set-name") == name:
name = dev_name
break
- self.dhcp_domain(ns.config['ethernets'][name], cfg)
+ self.dhcp_domain(ns.config["ethernets"][name], cfg)
ret_dict.update({link: cfg.get_final_conf()})
@@ -263,8 +267,8 @@ class Renderer(renderer.Renderer):
def available(target=None):
- expected = ['ip', 'systemctl']
- search = ['/usr/sbin', '/bin']
+ expected = ["ip", "systemctl"]
+ search = ["/usr/sbin", "/bin"]
for p in expected:
if not subp.which(p, search=search, target=target):
return False
diff --git a/cloudinit/net/openbsd.py b/cloudinit/net/openbsd.py
index d87d8a4f..da50d2ba 100644
--- a/cloudinit/net/openbsd.py
+++ b/cloudinit/net/openbsd.py
@@ -1,34 +1,31 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import log as logging
-from cloudinit import subp
-from cloudinit import util
import cloudinit.net.bsd
+from cloudinit import log as logging
+from cloudinit import subp, util
LOG = logging.getLogger(__name__)
class Renderer(cloudinit.net.bsd.BSDRenderer):
-
def write_config(self):
for device_name, v in self.interface_configurations.items():
- if_file = 'etc/hostname.{}'.format(device_name)
+ if_file = "etc/hostname.{}".format(device_name)
fn = subp.target_path(self.target, if_file)
if device_name in self.dhcp_interfaces():
- content = 'dhcp\n'
+ content = "dhcp\n"
elif isinstance(v, dict):
try:
content = "inet {address} {netmask}".format(
- address=v['address'],
- netmask=v['netmask']
+ address=v["address"], netmask=v["netmask"]
)
except KeyError:
LOG.error(
- "Invalid static configuration for %s",
- device_name)
+ "Invalid static configuration for %s", device_name
+ )
mtu = v.get("mtu")
if mtu:
- content += (' mtu %d' % mtu)
+ content += " mtu %d" % mtu
content += "\n"
util.write_file(fn, content)
@@ -36,16 +33,16 @@ class Renderer(cloudinit.net.bsd.BSDRenderer):
if not self._postcmds:
LOG.debug("openbsd generate postcmd disabled")
return
- subp.subp(['pkill', 'dhclient'], capture=True, rcs=[0, 1])
- subp.subp(['route', 'del', 'default'], capture=True, rcs=[0, 1])
- subp.subp(['route', 'flush', 'default'], capture=True, rcs=[0, 1])
- subp.subp(['sh', '/etc/netstart'], capture=True)
+ subp.subp(["pkill", "dhclient"], capture=True, rcs=[0, 1])
+ subp.subp(["route", "del", "default"], capture=True, rcs=[0, 1])
+ subp.subp(["route", "flush", "default"], capture=True, rcs=[0, 1])
+ subp.subp(["sh", "/etc/netstart"], capture=True)
def set_route(self, network, netmask, gateway):
- if network == '0.0.0.0':
- if_file = 'etc/mygate'
+ if network == "0.0.0.0":
+ if_file = "etc/mygate"
fn = subp.target_path(self.target, if_file)
- content = gateway + '\n'
+ content = gateway + "\n"
util.write_file(fn, content)
diff --git a/cloudinit/net/renderer.py b/cloudinit/net/renderer.py
index 54a83b51..34b74b80 100644
--- a/cloudinit/net/renderer.py
+++ b/cloudinit/net/renderer.py
@@ -13,18 +13,18 @@ from cloudinit.net.udev import generate_udev_rule
def filter_by_type(match_type):
- return lambda iface: match_type == iface['type']
+ return lambda iface: match_type == iface["type"]
def filter_by_name(match_name):
- return lambda iface: match_name == iface['name']
+ return lambda iface: match_name == iface["name"]
def filter_by_attr(match_name):
return lambda iface: (match_name in iface and iface[match_name])
-filter_by_physical = filter_by_type('physical')
+filter_by_physical = filter_by_type("physical")
class Renderer(object):
@@ -39,22 +39,27 @@ class Renderer(object):
content = io.StringIO()
for iface in network_state.iter_interfaces(filter_by_physical):
# for physical interfaces write out a persist net udev rule
- if 'name' in iface and iface.get('mac_address'):
- driver = iface.get('driver', None)
- content.write(generate_udev_rule(iface['name'],
- iface['mac_address'],
- driver=driver))
+ if "name" in iface and iface.get("mac_address"):
+ driver = iface.get("driver", None)
+ content.write(
+ generate_udev_rule(
+ iface["name"], iface["mac_address"], driver=driver
+ )
+ )
return content.getvalue()
@abc.abstractmethod
- def render_network_state(self, network_state, templates=None,
- target=None):
+ def render_network_state(self, network_state, templates=None, target=None):
"""Render network state."""
- def render_network_config(self, network_config, templates=None,
- target=None):
+ def render_network_config(
+ self, network_config, templates=None, target=None
+ ):
return self.render_network_state(
network_state=parse_net_config_data(network_config),
- templates=templates, target=target)
+ templates=templates,
+ target=target,
+ )
+
# vi: ts=4 expandtab
diff --git a/cloudinit/net/renderers.py b/cloudinit/net/renderers.py
index 822b45de..c755f04c 100644
--- a/cloudinit/net/renderers.py
+++ b/cloudinit/net/renderers.py
@@ -2,15 +2,17 @@
from typing import List, Tuple, Type
-from . import eni
-from . import freebsd
-from . import netbsd
-from . import netplan
-from . import networkd
-from . import renderer
-from . import RendererNotFoundError
-from . import openbsd
-from . import sysconfig
+from . import (
+ RendererNotFoundError,
+ eni,
+ freebsd,
+ netbsd,
+ netplan,
+ networkd,
+ openbsd,
+ renderer,
+ sysconfig,
+)
NAME_TO_RENDERER = {
"eni": eni,
@@ -22,8 +24,15 @@ NAME_TO_RENDERER = {
"sysconfig": sysconfig,
}
-DEFAULT_PRIORITY = ["eni", "sysconfig", "netplan", "freebsd",
- "netbsd", "openbsd", "networkd"]
+DEFAULT_PRIORITY = [
+ "eni",
+ "sysconfig",
+ "netplan",
+ "freebsd",
+ "netbsd",
+ "openbsd",
+ "networkd",
+]
def search(
@@ -37,7 +46,8 @@ def search(
unknown = [i for i in priority if i not in available]
if unknown:
raise ValueError(
- "Unknown renderers provided in priority list: %s" % unknown)
+ "Unknown renderers provided in priority list: %s" % unknown
+ )
found = []
for name in priority:
@@ -60,8 +70,10 @@ def select(priority=None, target=None) -> Tuple[str, Type[renderer.Renderer]]:
if target and target != "/":
tmsg = " in target=%s" % target
raise RendererNotFoundError(
- "No available network renderers found%s. Searched "
- "through list: %s" % (tmsg, priority))
+ "No available network renderers found%s. Searched through list: %s"
+ % (tmsg, priority)
+ )
return found[0]
+
# vi: ts=4 expandtab
diff --git a/cloudinit/net/sysconfig.py b/cloudinit/net/sysconfig.py
index 85342219..997907bb 100644
--- a/cloudinit/net/sysconfig.py
+++ b/cloudinit/net/sysconfig.py
@@ -8,23 +8,35 @@ import re
from configobj import ConfigObj
from cloudinit import log as logging
-from cloudinit import util
-from cloudinit import subp
-from cloudinit.distros.parsers import networkmanager_conf
-from cloudinit.distros.parsers import resolv_conf
+from cloudinit import subp, util
+from cloudinit.distros.parsers import networkmanager_conf, resolv_conf
from . import renderer
from .network_state import (
- is_ipv6_addr, net_prefix_to_ipv4_mask, subnet_is_ipv6, IPV6_DYNAMIC_TYPES)
+ IPV6_DYNAMIC_TYPES,
+ is_ipv6_addr,
+ net_prefix_to_ipv4_mask,
+ subnet_is_ipv6,
+)
LOG = logging.getLogger(__name__)
-KNOWN_DISTROS = ['almalinux', 'centos', 'cloudlinux', 'eurolinux', 'fedora',
- 'miraclelinux', 'openEuler', 'rhel', 'rocky', 'suse',
- 'virtuozzo']
+KNOWN_DISTROS = [
+ "almalinux",
+ "centos",
+ "cloudlinux",
+ "eurolinux",
+ "fedora",
+ "miraclelinux",
+ "openEuler",
+ "rhel",
+ "rocky",
+ "suse",
+ "virtuozzo",
+]
NM_CFG_FILE = "/etc/NetworkManager/NetworkManager.conf"
-def _make_header(sep='#'):
+def _make_header(sep="#"):
lines = [
"Created by cloud-init on instance boot automatically, do not edit.",
"",
@@ -38,8 +50,8 @@ def _make_header(sep='#'):
def _is_default_route(route):
- default_nets = ('::', '0.0.0.0')
- return route['prefix'] == 0 and route['network'] in default_nets
+ default_nets = ("::", "0.0.0.0")
+ return route["prefix"] == 0 and route["network"] in default_nets
def _quote_value(value):
@@ -56,19 +68,19 @@ def _quote_value(value):
def enable_ifcfg_rh(path):
"""Add ifcfg-rh to NetworkManager.cfg plugins if main section is present"""
config = ConfigObj(path)
- if 'main' in config:
- if 'plugins' in config['main']:
- if 'ifcfg-rh' in config['main']['plugins']:
+ if "main" in config:
+ if "plugins" in config["main"]:
+ if "ifcfg-rh" in config["main"]["plugins"]:
return
else:
- config['main']['plugins'] = []
+ config["main"]["plugins"] = []
- if isinstance(config['main']['plugins'], list):
- config['main']['plugins'].append('ifcfg-rh')
+ if isinstance(config["main"]["plugins"], list):
+ config["main"]["plugins"].append("ifcfg-rh")
else:
- config['main']['plugins'] = [config['main']['plugins'], 'ifcfg-rh']
+ config["main"]["plugins"] = [config["main"]["plugins"], "ifcfg-rh"]
config.write()
- LOG.debug('Enabled ifcfg-rh NetworkManager plugins')
+ LOG.debug("Enabled ifcfg-rh NetworkManager plugins")
class ConfigMap(object):
@@ -76,8 +88,8 @@ class ConfigMap(object):
# Why does redhat prefer yes/no to true/false??
_bool_map = {
- True: 'yes',
- False: 'no',
+ True: "yes",
+ False: "no",
}
def __init__(self):
@@ -128,8 +140,7 @@ class ConfigMap(object):
class Route(ConfigMap):
"""Represents a route configuration."""
- def __init__(self, route_name, base_sysconf_dir,
- ipv4_tpl, ipv6_tpl):
+ def __init__(self, route_name, base_sysconf_dir, ipv4_tpl, ipv6_tpl):
super(Route, self).__init__()
self.last_idx = 1
self.has_set_default_ipv4 = False
@@ -140,8 +151,12 @@ class Route(ConfigMap):
self.route_fn_tpl_ipv6 = ipv6_tpl
def copy(self):
- r = Route(self._route_name, self._base_sysconf_dir,
- self.route_fn_tpl_ipv4, self.route_fn_tpl_ipv6)
+ r = Route(
+ self._route_name,
+ self._base_sysconf_dir,
+ self.route_fn_tpl_ipv4,
+ self.route_fn_tpl_ipv6,
+ )
r._conf = self._conf.copy()
r.last_idx = self.last_idx
r.has_set_default_ipv4 = self.has_set_default_ipv4
@@ -150,20 +165,22 @@ class Route(ConfigMap):
@property
def path_ipv4(self):
- return self.route_fn_tpl_ipv4 % ({'base': self._base_sysconf_dir,
- 'name': self._route_name})
+ return self.route_fn_tpl_ipv4 % (
+ {"base": self._base_sysconf_dir, "name": self._route_name}
+ )
@property
def path_ipv6(self):
- return self.route_fn_tpl_ipv6 % ({'base': self._base_sysconf_dir,
- 'name': self._route_name})
+ return self.route_fn_tpl_ipv6 % (
+ {"base": self._base_sysconf_dir, "name": self._route_name}
+ )
def is_ipv6_route(self, address):
- return ':' in address
+ return ":" in address
def to_string(self, proto="ipv4"):
# only accept ipv4 and ipv6
- if proto not in ['ipv4', 'ipv6']:
+ if proto not in ["ipv4", "ipv6"]:
raise ValueError("Unknown protocol '%s'" % (str(proto)))
buf = io.StringIO()
buf.write(_make_header())
@@ -173,8 +190,8 @@ class Route(ConfigMap):
# (because Route can contain a mix of IPv4 and IPv6)
reindex = -1
for key in sorted(self._conf.keys()):
- if 'ADDRESS' in key:
- index = key.replace('ADDRESS', '')
+ if "ADDRESS" in key:
+ index = key.replace("ADDRESS", "")
address_value = str(self._conf[key])
# only accept combinations:
# if proto ipv6 only display ipv6 routes
@@ -183,33 +200,59 @@ class Route(ConfigMap):
# do not add ipv4 routes if proto is ipv6
# (this array will contain a mix of ipv4 and ipv6)
if proto == "ipv4" and not self.is_ipv6_route(address_value):
- netmask_value = str(self._conf['NETMASK' + index])
- gateway_value = str(self._conf['GATEWAY' + index])
+ netmask_value = str(self._conf["NETMASK" + index])
+ gateway_value = str(self._conf["GATEWAY" + index])
# increase IPv4 index
reindex = reindex + 1
- buf.write("%s=%s\n" % ('ADDRESS' + str(reindex),
- _quote_value(address_value)))
- buf.write("%s=%s\n" % ('GATEWAY' + str(reindex),
- _quote_value(gateway_value)))
- buf.write("%s=%s\n" % ('NETMASK' + str(reindex),
- _quote_value(netmask_value)))
- metric_key = 'METRIC' + index
+ buf.write(
+ "%s=%s\n"
+ % (
+ "ADDRESS" + str(reindex),
+ _quote_value(address_value),
+ )
+ )
+ buf.write(
+ "%s=%s\n"
+ % (
+ "GATEWAY" + str(reindex),
+ _quote_value(gateway_value),
+ )
+ )
+ buf.write(
+ "%s=%s\n"
+ % (
+ "NETMASK" + str(reindex),
+ _quote_value(netmask_value),
+ )
+ )
+ metric_key = "METRIC" + index
if metric_key in self._conf:
- metric_value = str(self._conf['METRIC' + index])
- buf.write("%s=%s\n" % ('METRIC' + str(reindex),
- _quote_value(metric_value)))
+ metric_value = str(self._conf["METRIC" + index])
+ buf.write(
+ "%s=%s\n"
+ % (
+ "METRIC" + str(reindex),
+ _quote_value(metric_value),
+ )
+ )
elif proto == "ipv6" and self.is_ipv6_route(address_value):
- netmask_value = str(self._conf['NETMASK' + index])
- gateway_value = str(self._conf['GATEWAY' + index])
+ netmask_value = str(self._conf["NETMASK" + index])
+ gateway_value = str(self._conf["GATEWAY" + index])
metric_value = (
- 'metric ' + str(self._conf['METRIC' + index])
- if 'METRIC' + index in self._conf else '')
+ "metric " + str(self._conf["METRIC" + index])
+ if "METRIC" + index in self._conf
+ else ""
+ )
buf.write(
- "%s/%s via %s %s dev %s\n" % (address_value,
- netmask_value,
- gateway_value,
- metric_value,
- self._route_name))
+ "%s/%s via %s %s dev %s\n"
+ % (
+ address_value,
+ netmask_value,
+ gateway_value,
+ metric_value,
+ self._route_name,
+ )
+ )
return buf.getvalue()
@@ -218,27 +261,31 @@ class NetInterface(ConfigMap):
"""Represents a sysconfig/networking-script (and its config + children)."""
iface_types = {
- 'ethernet': 'Ethernet',
- 'bond': 'Bond',
- 'bridge': 'Bridge',
- 'infiniband': 'InfiniBand',
- 'vlan': 'Vlan',
+ "ethernet": "Ethernet",
+ "bond": "Bond",
+ "bridge": "Bridge",
+ "infiniband": "InfiniBand",
+ "vlan": "Vlan",
}
- def __init__(self, iface_name, base_sysconf_dir, templates,
- kind='ethernet'):
+ def __init__(
+ self, iface_name, base_sysconf_dir, templates, kind="ethernet"
+ ):
super(NetInterface, self).__init__()
self.children = []
self.templates = templates
- route_tpl = self.templates.get('route_templates')
- self.routes = Route(iface_name, base_sysconf_dir,
- ipv4_tpl=route_tpl.get('ipv4'),
- ipv6_tpl=route_tpl.get('ipv6'))
- self.iface_fn_tpl = self.templates.get('iface_templates')
+ route_tpl = self.templates.get("route_templates")
+ self.routes = Route(
+ iface_name,
+ base_sysconf_dir,
+ ipv4_tpl=route_tpl.get("ipv4"),
+ ipv6_tpl=route_tpl.get("ipv6"),
+ )
+ self.iface_fn_tpl = self.templates.get("iface_templates")
self.kind = kind
self._iface_name = iface_name
- self._conf['DEVICE'] = iface_name
+ self._conf["DEVICE"] = iface_name
self._base_sysconf_dir = base_sysconf_dir
@property
@@ -248,7 +295,7 @@ class NetInterface(ConfigMap):
@name.setter
def name(self, iface_name):
self._iface_name = iface_name
- self._conf['DEVICE'] = iface_name
+ self._conf["DEVICE"] = iface_name
@property
def kind(self):
@@ -259,16 +306,18 @@ class NetInterface(ConfigMap):
if kind not in self.iface_types:
raise ValueError(kind)
self._kind = kind
- self._conf['TYPE'] = self.iface_types[kind]
+ self._conf["TYPE"] = self.iface_types[kind]
@property
def path(self):
- return self.iface_fn_tpl % ({'base': self._base_sysconf_dir,
- 'name': self.name})
+ return self.iface_fn_tpl % (
+ {"base": self._base_sysconf_dir, "name": self.name}
+ )
def copy(self, copy_children=False, copy_routes=False):
- c = NetInterface(self.name, self._base_sysconf_dir,
- self.templates, kind=self._kind)
+ c = NetInterface(
+ self.name, self._base_sysconf_dir, self.templates, kind=self._kind
+ )
c._conf = self._conf.copy()
if copy_children:
c.children = list(self.children)
@@ -277,7 +326,7 @@ class NetInterface(ConfigMap):
return c
def skip_key_value(self, key, val):
- if key == 'TYPE' and val == 'Vlan':
+ if key == "TYPE" and val == "Vlan":
return True
return False
@@ -291,166 +340,180 @@ class Renderer(renderer.Renderer):
# details about this)
iface_defaults = {
- 'rhel': {'ONBOOT': True, 'USERCTL': False, 'NM_CONTROLLED': False,
- 'BOOTPROTO': 'none'},
- 'suse': {'BOOTPROTO': 'static', 'STARTMODE': 'auto'},
+ "rhel": {
+ "ONBOOT": True,
+ "USERCTL": False,
+ "NM_CONTROLLED": False,
+ "BOOTPROTO": "none",
+ },
+ "suse": {"BOOTPROTO": "static", "STARTMODE": "auto"},
}
cfg_key_maps = {
- 'rhel': {
- 'accept-ra': 'IPV6_FORCE_ACCEPT_RA',
- 'bridge_stp': 'STP',
- 'bridge_ageing': 'AGEING',
- 'bridge_bridgeprio': 'PRIO',
- 'mac_address': 'HWADDR',
- 'mtu': 'MTU',
+ "rhel": {
+ "accept-ra": "IPV6_FORCE_ACCEPT_RA",
+ "bridge_stp": "STP",
+ "bridge_ageing": "AGEING",
+ "bridge_bridgeprio": "PRIO",
+ "mac_address": "HWADDR",
+ "mtu": "MTU",
},
- 'suse': {
- 'bridge_stp': 'BRIDGE_STP',
- 'bridge_ageing': 'BRIDGE_AGEINGTIME',
- 'bridge_bridgeprio': 'BRIDGE_PRIORITY',
- 'mac_address': 'LLADDR',
- 'mtu': 'MTU',
+ "suse": {
+ "bridge_stp": "BRIDGE_STP",
+ "bridge_ageing": "BRIDGE_AGEINGTIME",
+ "bridge_bridgeprio": "BRIDGE_PRIORITY",
+ "mac_address": "LLADDR",
+ "mtu": "MTU",
},
}
# If these keys exist, then their values will be used to form
# a BONDING_OPTS / BONDING_MODULE_OPTS grouping; otherwise no
# grouping will be set.
- bond_tpl_opts = tuple([
- ('bond_mode', "mode=%s"),
- ('bond_xmit_hash_policy', "xmit_hash_policy=%s"),
- ('bond_miimon', "miimon=%s"),
- ('bond_min_links', "min_links=%s"),
- ('bond_arp_interval', "arp_interval=%s"),
- ('bond_arp_ip_target', "arp_ip_target=%s"),
- ('bond_arp_validate', "arp_validate=%s"),
- ('bond_ad_select', "ad_select=%s"),
- ('bond_num_grat_arp', "num_grat_arp=%s"),
- ('bond_downdelay', "downdelay=%s"),
- ('bond_updelay', "updelay=%s"),
- ('bond_lacp_rate', "lacp_rate=%s"),
- ('bond_fail_over_mac', "fail_over_mac=%s"),
- ('bond_primary', "primary=%s"),
- ('bond_primary_reselect', "primary_reselect=%s"),
- ])
+ bond_tpl_opts = tuple(
+ [
+ ("bond_mode", "mode=%s"),
+ ("bond_xmit_hash_policy", "xmit_hash_policy=%s"),
+ ("bond_miimon", "miimon=%s"),
+ ("bond_min_links", "min_links=%s"),
+ ("bond_arp_interval", "arp_interval=%s"),
+ ("bond_arp_ip_target", "arp_ip_target=%s"),
+ ("bond_arp_validate", "arp_validate=%s"),
+ ("bond_ad_select", "ad_select=%s"),
+ ("bond_num_grat_arp", "num_grat_arp=%s"),
+ ("bond_downdelay", "downdelay=%s"),
+ ("bond_updelay", "updelay=%s"),
+ ("bond_lacp_rate", "lacp_rate=%s"),
+ ("bond_fail_over_mac", "fail_over_mac=%s"),
+ ("bond_primary", "primary=%s"),
+ ("bond_primary_reselect", "primary_reselect=%s"),
+ ]
+ )
templates = {}
def __init__(self, config=None):
if not config:
config = {}
- self.sysconf_dir = config.get('sysconf_dir', 'etc/sysconfig')
+ self.sysconf_dir = config.get("sysconf_dir", "etc/sysconfig")
self.netrules_path = config.get(
- 'netrules_path', 'etc/udev/rules.d/70-persistent-net.rules')
- self.dns_path = config.get('dns_path', 'etc/resolv.conf')
- nm_conf_path = 'etc/NetworkManager/conf.d/99-cloud-init.conf'
- self.networkmanager_conf_path = config.get('networkmanager_conf_path',
- nm_conf_path)
+ "netrules_path", "etc/udev/rules.d/70-persistent-net.rules"
+ )
+ self.dns_path = config.get("dns_path", "etc/resolv.conf")
+ nm_conf_path = "etc/NetworkManager/conf.d/99-cloud-init.conf"
+ self.networkmanager_conf_path = config.get(
+ "networkmanager_conf_path", nm_conf_path
+ )
self.templates = {
- 'control': config.get('control'),
- 'iface_templates': config.get('iface_templates'),
- 'route_templates': config.get('route_templates'),
+ "control": config.get("control"),
+ "iface_templates": config.get("iface_templates"),
+ "route_templates": config.get("route_templates"),
}
- self.flavor = config.get('flavor', 'rhel')
+ self.flavor = config.get("flavor", "rhel")
@classmethod
def _render_iface_shared(cls, iface, iface_cfg, flavor):
flavor_defaults = copy.deepcopy(cls.iface_defaults.get(flavor, {}))
iface_cfg.update(flavor_defaults)
- for old_key in ('mac_address', 'mtu', 'accept-ra'):
+ for old_key in ("mac_address", "mtu", "accept-ra"):
old_value = iface.get(old_key)
if old_value is not None:
# only set HWADDR on physical interfaces
- if (old_key == 'mac_address' and
- iface['type'] not in ['physical', 'infiniband']):
+ if old_key == "mac_address" and iface["type"] not in [
+ "physical",
+ "infiniband",
+ ]:
continue
new_key = cls.cfg_key_maps[flavor].get(old_key)
if new_key:
iface_cfg[new_key] = old_value
# only set WakeOnLan for physical interfaces
- if ('wakeonlan' in iface and iface['wakeonlan'] and
- iface['type'] == 'physical'):
- iface_cfg['ETHTOOL_OPTS'] = 'wol g'
+ if (
+ "wakeonlan" in iface
+ and iface["wakeonlan"]
+ and iface["type"] == "physical"
+ ):
+ iface_cfg["ETHTOOL_OPTS"] = "wol g"
@classmethod
def _render_subnets(cls, iface_cfg, subnets, has_default_route, flavor):
# setting base values
- if flavor == 'suse':
- iface_cfg['BOOTPROTO'] = 'static'
- if 'BRIDGE' in iface_cfg:
- iface_cfg['BOOTPROTO'] = 'dhcp'
- iface_cfg.drop('BRIDGE')
+ if flavor == "suse":
+ iface_cfg["BOOTPROTO"] = "static"
+ if "BRIDGE" in iface_cfg:
+ iface_cfg["BOOTPROTO"] = "dhcp"
+ iface_cfg.drop("BRIDGE")
else:
- iface_cfg['BOOTPROTO'] = 'none'
+ iface_cfg["BOOTPROTO"] = "none"
# modifying base values according to subnets
for i, subnet in enumerate(subnets, start=len(iface_cfg.children)):
- mtu_key = 'MTU'
- subnet_type = subnet.get('type')
- if subnet_type == 'dhcp6' or subnet_type == 'ipv6_dhcpv6-stateful':
- if flavor == 'suse':
+ mtu_key = "MTU"
+ subnet_type = subnet.get("type")
+ if subnet_type == "dhcp6" or subnet_type == "ipv6_dhcpv6-stateful":
+ if flavor == "suse":
# User wants dhcp for both protocols
- if iface_cfg['BOOTPROTO'] == 'dhcp4':
- iface_cfg['BOOTPROTO'] = 'dhcp'
+ if iface_cfg["BOOTPROTO"] == "dhcp4":
+ iface_cfg["BOOTPROTO"] = "dhcp"
else:
# Only IPv6 is DHCP, IPv4 may be static
- iface_cfg['BOOTPROTO'] = 'dhcp6'
- iface_cfg['DHCLIENT6_MODE'] = 'managed'
+ iface_cfg["BOOTPROTO"] = "dhcp6"
+ iface_cfg["DHCLIENT6_MODE"] = "managed"
# only if rhel AND dhcpv6 stateful
- elif (flavor == 'rhel' and
- subnet_type == 'ipv6_dhcpv6-stateful'):
- iface_cfg['BOOTPROTO'] = 'dhcp'
- iface_cfg['DHCPV6C'] = True
- iface_cfg['IPV6INIT'] = True
- iface_cfg['IPV6_AUTOCONF'] = False
+ elif (
+ flavor == "rhel" and subnet_type == "ipv6_dhcpv6-stateful"
+ ):
+ iface_cfg["BOOTPROTO"] = "dhcp"
+ iface_cfg["DHCPV6C"] = True
+ iface_cfg["IPV6INIT"] = True
+ iface_cfg["IPV6_AUTOCONF"] = False
else:
- iface_cfg['IPV6INIT'] = True
+ iface_cfg["IPV6INIT"] = True
# Configure network settings using DHCPv6
- iface_cfg['DHCPV6C'] = True
- elif subnet_type == 'ipv6_dhcpv6-stateless':
- if flavor == 'suse':
+ iface_cfg["DHCPV6C"] = True
+ elif subnet_type == "ipv6_dhcpv6-stateless":
+ if flavor == "suse":
# User wants dhcp for both protocols
- if iface_cfg['BOOTPROTO'] == 'dhcp4':
- iface_cfg['BOOTPROTO'] = 'dhcp'
+ if iface_cfg["BOOTPROTO"] == "dhcp4":
+ iface_cfg["BOOTPROTO"] = "dhcp"
else:
# Only IPv6 is DHCP, IPv4 may be static
- iface_cfg['BOOTPROTO'] = 'dhcp6'
- iface_cfg['DHCLIENT6_MODE'] = 'info'
+ iface_cfg["BOOTPROTO"] = "dhcp6"
+ iface_cfg["DHCLIENT6_MODE"] = "info"
else:
- iface_cfg['IPV6INIT'] = True
+ iface_cfg["IPV6INIT"] = True
# Configure network settings using SLAAC from RAs and
# optional info from dhcp server using DHCPv6
- iface_cfg['IPV6_AUTOCONF'] = True
- iface_cfg['DHCPV6C'] = True
+ iface_cfg["IPV6_AUTOCONF"] = True
+ iface_cfg["DHCPV6C"] = True
# Use Information-request to get only stateless
# configuration parameters (i.e., without address).
- iface_cfg['DHCPV6C_OPTIONS'] = '-S'
- elif subnet_type == 'ipv6_slaac':
- if flavor == 'suse':
+ iface_cfg["DHCPV6C_OPTIONS"] = "-S"
+ elif subnet_type == "ipv6_slaac":
+ if flavor == "suse":
# User wants dhcp for both protocols
- if iface_cfg['BOOTPROTO'] == 'dhcp4':
- iface_cfg['BOOTPROTO'] = 'dhcp'
+ if iface_cfg["BOOTPROTO"] == "dhcp4":
+ iface_cfg["BOOTPROTO"] = "dhcp"
else:
# Only IPv6 is DHCP, IPv4 may be static
- iface_cfg['BOOTPROTO'] = 'dhcp6'
- iface_cfg['DHCLIENT6_MODE'] = 'info'
+ iface_cfg["BOOTPROTO"] = "dhcp6"
+ iface_cfg["DHCLIENT6_MODE"] = "info"
else:
- iface_cfg['IPV6INIT'] = True
+ iface_cfg["IPV6INIT"] = True
# Configure network settings using SLAAC from RAs
- iface_cfg['IPV6_AUTOCONF'] = True
- elif subnet_type in ['dhcp4', 'dhcp']:
- bootproto_in = iface_cfg['BOOTPROTO']
- iface_cfg['BOOTPROTO'] = 'dhcp'
- if flavor == 'suse' and subnet_type == 'dhcp4':
+ iface_cfg["IPV6_AUTOCONF"] = True
+ elif subnet_type in ["dhcp4", "dhcp"]:
+ bootproto_in = iface_cfg["BOOTPROTO"]
+ iface_cfg["BOOTPROTO"] = "dhcp"
+ if flavor == "suse" and subnet_type == "dhcp4":
# If dhcp6 is already specified the user wants dhcp
# for both protocols
- if bootproto_in != 'dhcp6':
+ if bootproto_in != "dhcp6":
# Only IPv4 is DHCP, IPv6 may be static
- iface_cfg['BOOTPROTO'] = 'dhcp4'
- elif subnet_type in ['static', 'static6']:
+ iface_cfg["BOOTPROTO"] = "dhcp4"
+ elif subnet_type in ["static", "static6"]:
# RH info
# grep BOOTPROTO sysconfig.txt -A2 | head -3
# BOOTPROTO=none|bootp|dhcp
@@ -458,169 +521,184 @@ class Renderer(renderer.Renderer):
# to run on the device. Any other
# value causes any static configuration
# in the file to be applied.
- if subnet_is_ipv6(subnet) and flavor != 'suse':
- mtu_key = 'IPV6_MTU'
- iface_cfg['IPV6INIT'] = True
- if 'mtu' in subnet:
- mtu_mismatch = bool(mtu_key in iface_cfg and
- subnet['mtu'] != iface_cfg[mtu_key])
+ if subnet_is_ipv6(subnet) and flavor != "suse":
+ mtu_key = "IPV6_MTU"
+ iface_cfg["IPV6INIT"] = True
+ if "mtu" in subnet:
+ mtu_mismatch = bool(
+ mtu_key in iface_cfg
+ and subnet["mtu"] != iface_cfg[mtu_key]
+ )
if mtu_mismatch:
LOG.warning(
- 'Network config: ignoring %s device-level mtu:%s'
- ' because ipv4 subnet-level mtu:%s provided.',
- iface_cfg.name, iface_cfg[mtu_key], subnet['mtu'])
+ "Network config: ignoring %s device-level mtu:%s"
+ " because ipv4 subnet-level mtu:%s provided.",
+ iface_cfg.name,
+ iface_cfg[mtu_key],
+ subnet["mtu"],
+ )
if subnet_is_ipv6(subnet):
- if flavor == 'suse':
+ if flavor == "suse":
# TODO(rjschwei) write mtu setting to
# /etc/sysctl.d/
pass
else:
- iface_cfg[mtu_key] = subnet['mtu']
+ iface_cfg[mtu_key] = subnet["mtu"]
else:
- iface_cfg[mtu_key] = subnet['mtu']
+ iface_cfg[mtu_key] = subnet["mtu"]
- if subnet_is_ipv6(subnet) and flavor == 'rhel':
- iface_cfg['IPV6_FORCE_ACCEPT_RA'] = False
- iface_cfg['IPV6_AUTOCONF'] = False
- elif subnet_type == 'manual':
- if flavor == 'suse':
+ if subnet_is_ipv6(subnet) and flavor == "rhel":
+ iface_cfg["IPV6_FORCE_ACCEPT_RA"] = False
+ iface_cfg["IPV6_AUTOCONF"] = False
+ elif subnet_type == "manual":
+ if flavor == "suse":
LOG.debug('Unknown subnet type setting "%s"', subnet_type)
else:
# If the subnet has an MTU setting, then ONBOOT=True
# to apply the setting
- iface_cfg['ONBOOT'] = mtu_key in iface_cfg
+ iface_cfg["ONBOOT"] = mtu_key in iface_cfg
else:
- raise ValueError("Unknown subnet type '%s' found"
- " for interface '%s'" % (subnet_type,
- iface_cfg.name))
- if subnet.get('control') == 'manual':
- if flavor == 'suse':
- iface_cfg['STARTMODE'] = 'manual'
+ raise ValueError(
+ "Unknown subnet type '%s' found for interface '%s'"
+ % (subnet_type, iface_cfg.name)
+ )
+ if subnet.get("control") == "manual":
+ if flavor == "suse":
+ iface_cfg["STARTMODE"] = "manual"
else:
- iface_cfg['ONBOOT'] = False
+ iface_cfg["ONBOOT"] = False
# set IPv4 and IPv6 static addresses
ipv4_index = -1
ipv6_index = -1
for i, subnet in enumerate(subnets, start=len(iface_cfg.children)):
- subnet_type = subnet.get('type')
+ subnet_type = subnet.get("type")
# metric may apply to both dhcp and static config
- if 'metric' in subnet:
- if flavor != 'suse':
- iface_cfg['METRIC'] = subnet['metric']
- if subnet_type in ['dhcp', 'dhcp4']:
+ if "metric" in subnet:
+ if flavor != "suse":
+ iface_cfg["METRIC"] = subnet["metric"]
+ if subnet_type in ["dhcp", "dhcp4"]:
# On SUSE distros 'DHCLIENT_SET_DEFAULT_ROUTE' is a global
# setting in /etc/sysconfig/network/dhcp
- if flavor != 'suse':
- if has_default_route and iface_cfg['BOOTPROTO'] != 'none':
- iface_cfg['DHCLIENT_SET_DEFAULT_ROUTE'] = False
+ if flavor != "suse":
+ if has_default_route and iface_cfg["BOOTPROTO"] != "none":
+ iface_cfg["DHCLIENT_SET_DEFAULT_ROUTE"] = False
continue
elif subnet_type in IPV6_DYNAMIC_TYPES:
continue
- elif subnet_type in ['static', 'static6']:
+ elif subnet_type in ["static", "static6"]:
if subnet_is_ipv6(subnet):
ipv6_index = ipv6_index + 1
- ipv6_cidr = "%s/%s" % (subnet['address'], subnet['prefix'])
+ ipv6_cidr = "%s/%s" % (subnet["address"], subnet["prefix"])
if ipv6_index == 0:
- if flavor == 'suse':
- iface_cfg['IPADDR6'] = ipv6_cidr
+ if flavor == "suse":
+ iface_cfg["IPADDR6"] = ipv6_cidr
else:
- iface_cfg['IPV6ADDR'] = ipv6_cidr
+ iface_cfg["IPV6ADDR"] = ipv6_cidr
elif ipv6_index == 1:
- if flavor == 'suse':
- iface_cfg['IPADDR6_1'] = ipv6_cidr
+ if flavor == "suse":
+ iface_cfg["IPADDR6_1"] = ipv6_cidr
else:
- iface_cfg['IPV6ADDR_SECONDARIES'] = ipv6_cidr
+ iface_cfg["IPV6ADDR_SECONDARIES"] = ipv6_cidr
else:
- if flavor == 'suse':
- iface_cfg['IPADDR6_%d' % ipv6_index] = ipv6_cidr
+ if flavor == "suse":
+ iface_cfg["IPADDR6_%d" % ipv6_index] = ipv6_cidr
else:
- iface_cfg['IPV6ADDR_SECONDARIES'] += \
+ iface_cfg["IPV6ADDR_SECONDARIES"] += (
" " + ipv6_cidr
+ )
else:
ipv4_index = ipv4_index + 1
suff = "" if ipv4_index == 0 else str(ipv4_index)
- iface_cfg['IPADDR' + suff] = subnet['address']
- iface_cfg['NETMASK' + suff] = \
- net_prefix_to_ipv4_mask(subnet['prefix'])
-
- if 'gateway' in subnet and flavor != 'suse':
- iface_cfg['DEFROUTE'] = True
- if is_ipv6_addr(subnet['gateway']):
- iface_cfg['IPV6_DEFAULTGW'] = subnet['gateway']
+ iface_cfg["IPADDR" + suff] = subnet["address"]
+ iface_cfg["NETMASK" + suff] = net_prefix_to_ipv4_mask(
+ subnet["prefix"]
+ )
+
+ if "gateway" in subnet and flavor != "suse":
+ iface_cfg["DEFROUTE"] = True
+ if is_ipv6_addr(subnet["gateway"]):
+ iface_cfg["IPV6_DEFAULTGW"] = subnet["gateway"]
else:
- iface_cfg['GATEWAY'] = subnet['gateway']
+ iface_cfg["GATEWAY"] = subnet["gateway"]
- if 'dns_search' in subnet and flavor != 'suse':
- iface_cfg['DOMAIN'] = ' '.join(subnet['dns_search'])
+ if "dns_search" in subnet and flavor != "suse":
+ iface_cfg["DOMAIN"] = " ".join(subnet["dns_search"])
- if 'dns_nameservers' in subnet and flavor != 'suse':
- if len(subnet['dns_nameservers']) > 3:
+ if "dns_nameservers" in subnet and flavor != "suse":
+ if len(subnet["dns_nameservers"]) > 3:
# per resolv.conf(5) MAXNS sets this to 3.
- LOG.debug("%s has %d entries in dns_nameservers. "
- "Only 3 are used.", iface_cfg.name,
- len(subnet['dns_nameservers']))
- for i, k in enumerate(subnet['dns_nameservers'][:3], 1):
- iface_cfg['DNS' + str(i)] = k
+ LOG.debug(
+ "%s has %d entries in dns_nameservers. "
+ "Only 3 are used.",
+ iface_cfg.name,
+ len(subnet["dns_nameservers"]),
+ )
+ for i, k in enumerate(subnet["dns_nameservers"][:3], 1):
+ iface_cfg["DNS" + str(i)] = k
@classmethod
def _render_subnet_routes(cls, iface_cfg, route_cfg, subnets, flavor):
# TODO(rjschwei): route configuration on SUSE distro happens via
# ifroute-* files, see lp#1812117. SUSE currently carries a local
# patch in their package.
- if flavor == 'suse':
+ if flavor == "suse":
return
for _, subnet in enumerate(subnets, start=len(iface_cfg.children)):
- subnet_type = subnet.get('type')
- for route in subnet.get('routes', []):
- is_ipv6 = subnet.get('ipv6') or is_ipv6_addr(route['gateway'])
+ subnet_type = subnet.get("type")
+ for route in subnet.get("routes", []):
+ is_ipv6 = subnet.get("ipv6") or is_ipv6_addr(route["gateway"])
# Any dynamic configuration method, slaac, dhcpv6-stateful/
# stateless should get router information from router RA's.
- if (_is_default_route(route) and subnet_type not in
- IPV6_DYNAMIC_TYPES):
+ if (
+ _is_default_route(route)
+ and subnet_type not in IPV6_DYNAMIC_TYPES
+ ):
if (
- (subnet.get('ipv4') and
- route_cfg.has_set_default_ipv4) or
- (subnet.get('ipv6') and
- route_cfg.has_set_default_ipv6)
+ subnet.get("ipv4") and route_cfg.has_set_default_ipv4
+ ) or (
+ subnet.get("ipv6") and route_cfg.has_set_default_ipv6
):
- raise ValueError("Duplicate declaration of default "
- "route found for interface '%s'"
- % (iface_cfg.name))
+ raise ValueError(
+ "Duplicate declaration of default "
+ "route found for interface '%s'" % (iface_cfg.name)
+ )
# NOTE(harlowja): ipv6 and ipv4 default gateways
- gw_key = 'GATEWAY0'
- nm_key = 'NETMASK0'
- addr_key = 'ADDRESS0'
+ gw_key = "GATEWAY0"
+ nm_key = "NETMASK0"
+ addr_key = "ADDRESS0"
# The owning interface provides the default route.
#
# TODO(harlowja): add validation that no other iface has
# also provided the default route?
- iface_cfg['DEFROUTE'] = True
- if iface_cfg['BOOTPROTO'] in ('dhcp', 'dhcp4'):
- iface_cfg['DHCLIENT_SET_DEFAULT_ROUTE'] = True
- if 'gateway' in route:
+ iface_cfg["DEFROUTE"] = True
+ if iface_cfg["BOOTPROTO"] in ("dhcp", "dhcp4"):
+ iface_cfg["DHCLIENT_SET_DEFAULT_ROUTE"] = True
+ if "gateway" in route:
if is_ipv6:
- iface_cfg['IPV6_DEFAULTGW'] = route['gateway']
+ iface_cfg["IPV6_DEFAULTGW"] = route["gateway"]
route_cfg.has_set_default_ipv6 = True
else:
- iface_cfg['GATEWAY'] = route['gateway']
+ iface_cfg["GATEWAY"] = route["gateway"]
route_cfg.has_set_default_ipv4 = True
- if 'metric' in route:
- iface_cfg['METRIC'] = route['metric']
+ if "metric" in route:
+ iface_cfg["METRIC"] = route["metric"]
else:
- gw_key = 'GATEWAY%s' % route_cfg.last_idx
- nm_key = 'NETMASK%s' % route_cfg.last_idx
- addr_key = 'ADDRESS%s' % route_cfg.last_idx
- metric_key = 'METRIC%s' % route_cfg.last_idx
+ gw_key = "GATEWAY%s" % route_cfg.last_idx
+ nm_key = "NETMASK%s" % route_cfg.last_idx
+ addr_key = "ADDRESS%s" % route_cfg.last_idx
+ metric_key = "METRIC%s" % route_cfg.last_idx
route_cfg.last_idx += 1
# add default routes only to ifcfg files, not
# to route-* or route6-*
- for (old_key, new_key) in [('gateway', gw_key),
- ('metric', metric_key),
- ('netmask', nm_key),
- ('network', addr_key)]:
+ for (old_key, new_key) in [
+ ("gateway", gw_key),
+ ("metric", metric_key),
+ ("netmask", nm_key),
+ ("network", addr_key),
+ ]:
if old_key in route:
route_cfg[new_key] = route[old_key]
@@ -638,33 +716,35 @@ class Renderer(renderer.Renderer):
bond_opts.append(value_tpl % (bond_value))
break
if bond_opts:
- if flavor == 'suse':
+ if flavor == "suse":
# suse uses the sysconfig support which requires
# BONDING_MODULE_OPTS see
# https://www.kernel.org/doc/Documentation/networking/bonding.txt
# 3.1 Configuration with Sysconfig Support
- iface_cfg['BONDING_MODULE_OPTS'] = " ".join(bond_opts)
+ iface_cfg["BONDING_MODULE_OPTS"] = " ".join(bond_opts)
else:
# rhel uses initscript support and thus requires BONDING_OPTS
# this is also the old default see
# https://www.kernel.org/doc/Documentation/networking/bonding.txt
# 3.2 Configuration with Initscripts Support
- iface_cfg['BONDING_OPTS'] = " ".join(bond_opts)
+ iface_cfg["BONDING_OPTS"] = " ".join(bond_opts)
@classmethod
def _render_physical_interfaces(
- cls, network_state, iface_contents, flavor
+ cls, network_state, iface_contents, flavor
):
physical_filter = renderer.filter_by_physical
for iface in network_state.iter_interfaces(physical_filter):
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_subnets = iface.get("subnets", [])
iface_cfg = iface_contents[iface_name]
route_cfg = iface_cfg.routes
cls._render_subnets(
- iface_cfg, iface_subnets, network_state.has_default_route,
- flavor
+ iface_cfg,
+ iface_subnets,
+ network_state.has_default_route,
+ flavor,
)
cls._render_subnet_routes(
iface_cfg, route_cfg, iface_subnets, flavor
@@ -672,10 +752,10 @@ class Renderer(renderer.Renderer):
@classmethod
def _render_bond_interfaces(cls, network_state, iface_contents, flavor):
- bond_filter = renderer.filter_by_type('bond')
- slave_filter = renderer.filter_by_attr('bond-master')
+ bond_filter = renderer.filter_by_type("bond")
+ slave_filter = renderer.filter_by_attr("bond-master")
for iface in network_state.iter_interfaces(bond_filter):
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_cfg = iface_contents[iface_name]
cls._render_bonding_opts(iface_cfg, iface, flavor)
@@ -684,21 +764,23 @@ class Renderer(renderer.Renderer):
master_cfgs = [iface_cfg]
master_cfgs.extend(iface_cfg.children)
for master_cfg in master_cfgs:
- master_cfg['BONDING_MASTER'] = True
- if flavor != 'suse':
- master_cfg.kind = 'bond'
+ master_cfg["BONDING_MASTER"] = True
+ if flavor != "suse":
+ master_cfg.kind = "bond"
- if iface.get('mac_address'):
- if flavor == 'suse':
- iface_cfg['LLADDR'] = iface.get('mac_address')
+ if iface.get("mac_address"):
+ if flavor == "suse":
+ iface_cfg["LLADDR"] = iface.get("mac_address")
else:
- iface_cfg['MACADDR'] = iface.get('mac_address')
+ iface_cfg["MACADDR"] = iface.get("mac_address")
iface_subnets = iface.get("subnets", [])
route_cfg = iface_cfg.routes
cls._render_subnets(
- iface_cfg, iface_subnets, network_state.has_default_route,
- flavor
+ iface_cfg,
+ iface_subnets,
+ network_state.has_default_route,
+ flavor,
)
cls._render_subnet_routes(
iface_cfg, route_cfg, iface_subnets, flavor
@@ -707,54 +789,64 @@ class Renderer(renderer.Renderer):
# iter_interfaces on network-state is not sorted to produce
# consistent numbers we need to sort.
bond_slaves = sorted(
- [slave_iface['name'] for slave_iface in
- network_state.iter_interfaces(slave_filter)
- if slave_iface['bond-master'] == iface_name])
+ [
+ slave_iface["name"]
+ for slave_iface in network_state.iter_interfaces(
+ slave_filter
+ )
+ if slave_iface["bond-master"] == iface_name
+ ]
+ )
for index, bond_slave in enumerate(bond_slaves):
- if flavor == 'suse':
- slavestr = 'BONDING_SLAVE_%s' % index
+ if flavor == "suse":
+ slavestr = "BONDING_SLAVE_%s" % index
else:
- slavestr = 'BONDING_SLAVE%s' % index
+ slavestr = "BONDING_SLAVE%s" % index
iface_cfg[slavestr] = bond_slave
slave_cfg = iface_contents[bond_slave]
- if flavor == 'suse':
- slave_cfg['BOOTPROTO'] = 'none'
- slave_cfg['STARTMODE'] = 'hotplug'
+ if flavor == "suse":
+ slave_cfg["BOOTPROTO"] = "none"
+ slave_cfg["STARTMODE"] = "hotplug"
else:
- slave_cfg['MASTER'] = iface_name
- slave_cfg['SLAVE'] = True
+ slave_cfg["MASTER"] = iface_name
+ slave_cfg["SLAVE"] = True
@classmethod
def _render_vlan_interfaces(cls, network_state, iface_contents, flavor):
- vlan_filter = renderer.filter_by_type('vlan')
+ vlan_filter = renderer.filter_by_type("vlan")
for iface in network_state.iter_interfaces(vlan_filter):
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_cfg = iface_contents[iface_name]
- if flavor == 'suse':
- vlan_id = iface.get('vlan_id')
+ if flavor == "suse":
+ vlan_id = iface.get("vlan_id")
if vlan_id:
- iface_cfg['VLAN_ID'] = vlan_id
- iface_cfg['ETHERDEVICE'] = iface_name[:iface_name.rfind('.')]
+ iface_cfg["VLAN_ID"] = vlan_id
+ iface_cfg["ETHERDEVICE"] = iface_name[: iface_name.rfind(".")]
else:
- iface_cfg['VLAN'] = True
- iface_cfg.kind = 'vlan'
+ iface_cfg["VLAN"] = True
+ iface_cfg.kind = "vlan"
- rdev = iface['vlan-raw-device']
- supported = _supported_vlan_names(rdev, iface['vlan_id'])
+ rdev = iface["vlan-raw-device"]
+ supported = _supported_vlan_names(rdev, iface["vlan_id"])
if iface_name not in supported:
LOG.info(
"Name '%s' for vlan '%s' is not officially supported"
"by RHEL. Supported: %s",
- iface_name, rdev, ' '.join(supported))
- iface_cfg['PHYSDEV'] = rdev
+ iface_name,
+ rdev,
+ " ".join(supported),
+ )
+ iface_cfg["PHYSDEV"] = rdev
iface_subnets = iface.get("subnets", [])
route_cfg = iface_cfg.routes
cls._render_subnets(
- iface_cfg, iface_subnets, network_state.has_default_route,
- flavor
+ iface_cfg,
+ iface_subnets,
+ network_state.has_default_route,
+ flavor,
)
cls._render_subnet_routes(
iface_cfg, route_cfg, iface_subnets, flavor
@@ -763,8 +855,12 @@ class Renderer(renderer.Renderer):
@staticmethod
def _render_dns(network_state, existing_dns_path=None):
# skip writing resolv.conf if network_state doesn't include any input.
- if not any([len(network_state.dns_nameservers),
- len(network_state.dns_searchdomains)]):
+ if not any(
+ [
+ len(network_state.dns_nameservers),
+ len(network_state.dns_searchdomains),
+ ]
+ ):
return None
content = resolv_conf.ResolvConf("")
if existing_dns_path and os.path.isfile(existing_dns_path):
@@ -773,10 +869,10 @@ class Renderer(renderer.Renderer):
content.add_nameserver(nameserver)
for searchdomain in network_state.dns_searchdomains:
content.add_search_domain(searchdomain)
- header = _make_header(';')
+ header = _make_header(";")
content_str = str(content)
if not content_str.startswith(header):
- content_str = header + '\n' + content_str
+ content_str = header + "\n" + content_str
return content_str
@staticmethod
@@ -787,7 +883,7 @@ class Renderer(renderer.Renderer):
# NetworkManager to not manage dns, so that /etc/resolv.conf
# does not get clobbered.
if network_state.dns_nameservers:
- content.set_section_keypair('main', 'dns', 'none')
+ content.set_section_keypair("main", "dns", "none")
if len(content) == 0:
return None
@@ -797,39 +893,41 @@ class Renderer(renderer.Renderer):
@classmethod
def _render_bridge_interfaces(cls, network_state, iface_contents, flavor):
bridge_key_map = {
- old_k: new_k for old_k, new_k in cls.cfg_key_maps[flavor].items()
- if old_k.startswith('bridge')}
- bridge_filter = renderer.filter_by_type('bridge')
+ old_k: new_k
+ for old_k, new_k in cls.cfg_key_maps[flavor].items()
+ if old_k.startswith("bridge")
+ }
+ bridge_filter = renderer.filter_by_type("bridge")
for iface in network_state.iter_interfaces(bridge_filter):
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_cfg = iface_contents[iface_name]
- if flavor != 'suse':
- iface_cfg.kind = 'bridge'
+ if flavor != "suse":
+ iface_cfg.kind = "bridge"
for old_key, new_key in bridge_key_map.items():
if old_key in iface:
iface_cfg[new_key] = iface[old_key]
- if flavor == 'suse':
- if 'BRIDGE_STP' in iface_cfg:
- if iface_cfg.get('BRIDGE_STP'):
- iface_cfg['BRIDGE_STP'] = 'on'
+ if flavor == "suse":
+ if "BRIDGE_STP" in iface_cfg:
+ if iface_cfg.get("BRIDGE_STP"):
+ iface_cfg["BRIDGE_STP"] = "on"
else:
- iface_cfg['BRIDGE_STP'] = 'off'
-
- if iface.get('mac_address'):
- key = 'MACADDR'
- if flavor == 'suse':
- key = 'LLADDRESS'
- iface_cfg[key] = iface.get('mac_address')
-
- if flavor == 'suse':
- if iface.get('bridge_ports', []):
- iface_cfg['BRIDGE_PORTS'] = '%s' % " ".join(
- iface.get('bridge_ports')
+ iface_cfg["BRIDGE_STP"] = "off"
+
+ if iface.get("mac_address"):
+ key = "MACADDR"
+ if flavor == "suse":
+ key = "LLADDRESS"
+ iface_cfg[key] = iface.get("mac_address")
+
+ if flavor == "suse":
+ if iface.get("bridge_ports", []):
+ iface_cfg["BRIDGE_PORTS"] = "%s" % " ".join(
+ iface.get("bridge_ports")
)
# Is this the right key to get all the connected interfaces?
- for bridged_iface_name in iface.get('bridge_ports', []):
+ for bridged_iface_name in iface.get("bridge_ports", []):
# Ensure all bridged interfaces are correctly tagged
# as being bridged to this interface.
bridged_cfg = iface_contents[bridged_iface_name]
@@ -837,15 +935,17 @@ class Renderer(renderer.Renderer):
bridged_cfgs.extend(bridged_cfg.children)
for bridge_cfg in bridged_cfgs:
bridge_value = iface_name
- if flavor == 'suse':
- bridge_value = 'yes'
- bridge_cfg['BRIDGE'] = bridge_value
+ if flavor == "suse":
+ bridge_value = "yes"
+ bridge_cfg["BRIDGE"] = bridge_value
iface_subnets = iface.get("subnets", [])
route_cfg = iface_cfg.routes
cls._render_subnets(
- iface_cfg, iface_subnets, network_state.has_default_route,
- flavor
+ iface_cfg,
+ iface_subnets,
+ network_state.has_default_route,
+ flavor,
)
cls._render_subnet_routes(
iface_cfg, route_cfg, iface_subnets, flavor
@@ -853,37 +953,40 @@ class Renderer(renderer.Renderer):
@classmethod
def _render_ib_interfaces(cls, network_state, iface_contents, flavor):
- ib_filter = renderer.filter_by_type('infiniband')
+ ib_filter = renderer.filter_by_type("infiniband")
for iface in network_state.iter_interfaces(ib_filter):
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_cfg = iface_contents[iface_name]
- iface_cfg.kind = 'infiniband'
+ iface_cfg.kind = "infiniband"
iface_subnets = iface.get("subnets", [])
route_cfg = iface_cfg.routes
cls._render_subnets(
- iface_cfg, iface_subnets, network_state.has_default_route,
- flavor
+ iface_cfg,
+ iface_subnets,
+ network_state.has_default_route,
+ flavor,
)
cls._render_subnet_routes(
iface_cfg, route_cfg, iface_subnets, flavor
)
@classmethod
- def _render_sysconfig(cls, base_sysconf_dir, network_state, flavor,
- templates=None):
- '''Given state, return /etc/sysconfig files + contents'''
+ def _render_sysconfig(
+ cls, base_sysconf_dir, network_state, flavor, templates=None
+ ):
+ """Given state, return /etc/sysconfig files + contents"""
if not templates:
templates = cls.templates
iface_contents = {}
for iface in network_state.iter_interfaces():
- if iface['type'] == "loopback":
+ if iface["type"] == "loopback":
continue
- iface_name = iface['name']
+ iface_name = iface["name"]
iface_cfg = NetInterface(iface_name, base_sysconf_dir, templates)
- if flavor == 'suse':
- iface_cfg.drop('DEVICE')
+ if flavor == "suse":
+ iface_cfg.drop("DEVICE")
# If type detection fails it is considered a bug in SUSE
- iface_cfg.drop('TYPE')
+ iface_cfg.drop("TYPE")
cls._render_iface_shared(iface, iface_cfg, flavor)
iface_contents[iface_name] = iface_cfg
cls._render_physical_interfaces(network_state, iface_contents, flavor)
@@ -899,9 +1002,10 @@ class Renderer(renderer.Renderer):
if iface_cfg:
contents[iface_cfg.path] = iface_cfg.to_string()
if iface_cfg.routes:
- for cpath, proto in zip([iface_cfg.routes.path_ipv4,
- iface_cfg.routes.path_ipv6],
- ["ipv4", "ipv6"]):
+ for cpath, proto in zip(
+ [iface_cfg.routes.path_ipv4, iface_cfg.routes.path_ipv6],
+ ["ipv4", "ipv6"],
+ ):
if cpath not in contents:
contents[cpath] = iface_cfg.routes.to_string(proto)
return contents
@@ -911,21 +1015,24 @@ class Renderer(renderer.Renderer):
templates = self.templates
file_mode = 0o644
base_sysconf_dir = subp.target_path(target, self.sysconf_dir)
- for path, data in self._render_sysconfig(base_sysconf_dir,
- network_state, self.flavor,
- templates=templates).items():
+ for path, data in self._render_sysconfig(
+ base_sysconf_dir, network_state, self.flavor, templates=templates
+ ).items():
util.write_file(path, data, file_mode)
if self.dns_path:
dns_path = subp.target_path(target, self.dns_path)
- resolv_content = self._render_dns(network_state,
- existing_dns_path=dns_path)
+ resolv_content = self._render_dns(
+ network_state, existing_dns_path=dns_path
+ )
if resolv_content:
util.write_file(dns_path, resolv_content, file_mode)
if self.networkmanager_conf_path:
- nm_conf_path = subp.target_path(target,
- self.networkmanager_conf_path)
- nm_conf_content = self._render_networkmanager_conf(network_state,
- templates)
+ nm_conf_path = subp.target_path(
+ target, self.networkmanager_conf_path
+ )
+ nm_conf_content = self._render_networkmanager_conf(
+ network_state, templates
+ )
if nm_conf_content:
util.write_file(nm_conf_path, nm_conf_content, file_mode)
if self.netrules_path:
@@ -933,20 +1040,19 @@ class Renderer(renderer.Renderer):
netrules_path = subp.target_path(target, self.netrules_path)
util.write_file(netrules_path, netrules_content, file_mode)
if available_nm(target=target):
- enable_ifcfg_rh(subp.target_path(
- target, path=NM_CFG_FILE
- ))
+ enable_ifcfg_rh(subp.target_path(target, path=NM_CFG_FILE))
- sysconfig_path = subp.target_path(target, templates.get('control'))
+ sysconfig_path = subp.target_path(target, templates.get("control"))
# Distros configuring /etc/sysconfig/network as a file e.g. Centos
- if sysconfig_path.endswith('network'):
+ if sysconfig_path.endswith("network"):
util.ensure_dir(os.path.dirname(sysconfig_path))
- netcfg = [_make_header(), 'NETWORKING=yes']
+ netcfg = [_make_header(), "NETWORKING=yes"]
if network_state.use_ipv6:
- netcfg.append('NETWORKING_IPV6=yes')
- netcfg.append('IPV6_AUTOCONF=no')
- util.write_file(sysconfig_path,
- "\n".join(netcfg) + "\n", file_mode)
+ netcfg.append("NETWORKING_IPV6=yes")
+ netcfg.append("IPV6_AUTOCONF=no")
+ util.write_file(
+ sysconfig_path, "\n".join(netcfg) + "\n", file_mode
+ )
def _supported_vlan_names(rdev, vid):
@@ -954,27 +1060,34 @@ def _supported_vlan_names(rdev, vid):
11.5. Naming Scheme for VLAN Interfaces."""
return [
v.format(rdev=rdev, vid=int(vid))
- for v in ("{rdev}{vid:04}", "{rdev}{vid}",
- "{rdev}.{vid:04}", "{rdev}.{vid}")]
+ for v in (
+ "{rdev}{vid:04}",
+ "{rdev}{vid}",
+ "{rdev}.{vid:04}",
+ "{rdev}.{vid}",
+ )
+ ]
def available(target=None):
sysconfig = available_sysconfig(target=target)
nm = available_nm(target=target)
- return (util.system_info()['variant'] in KNOWN_DISTROS
- and any([nm, sysconfig]))
+ return util.system_info()["variant"] in KNOWN_DISTROS and any(
+ [nm, sysconfig]
+ )
def available_sysconfig(target=None):
- expected = ['ifup', 'ifdown']
- search = ['/sbin', '/usr/sbin']
+ expected = ["ifup", "ifdown"]
+ search = ["/sbin", "/usr/sbin"]
for p in expected:
if not subp.which(p, search=search, target=target):
return False
expected_paths = [
- 'etc/sysconfig/network-scripts/network-functions',
- 'etc/sysconfig/config']
+ "etc/sysconfig/network-scripts/network-functions",
+ "etc/sysconfig/config",
+ ]
for p in expected_paths:
if os.path.isfile(subp.target_path(target, p)):
return True
@@ -982,10 +1095,7 @@ def available_sysconfig(target=None):
def available_nm(target=None):
- if not os.path.isfile(subp.target_path(
- target,
- path=NM_CFG_FILE
- )):
+ if not os.path.isfile(subp.target_path(target, path=NM_CFG_FILE)):
return False
return True
diff --git a/cloudinit/net/udev.py b/cloudinit/net/udev.py
index 58c0a708..b79e4426 100644
--- a/cloudinit/net/udev.py
+++ b/cloudinit/net/udev.py
@@ -32,15 +32,18 @@ def generate_udev_rule(interface, mac, driver=None):
ATTR{address}="ff:ee:dd:cc:bb:aa", NAME="eth0"
"""
if not driver:
- driver = '?*'
-
- rule = ', '.join([
- compose_udev_equality('SUBSYSTEM', 'net'),
- compose_udev_equality('ACTION', 'add'),
- compose_udev_equality('DRIVERS', driver),
- compose_udev_attr_equality('address', mac),
- compose_udev_setting('NAME', interface),
- ])
- return '%s\n' % rule
+ driver = "?*"
+
+ rule = ", ".join(
+ [
+ compose_udev_equality("SUBSYSTEM", "net"),
+ compose_udev_equality("ACTION", "add"),
+ compose_udev_equality("DRIVERS", driver),
+ compose_udev_attr_equality("address", mac),
+ compose_udev_setting("NAME", interface),
+ ]
+ )
+ return "%s\n" % rule
+
# vi: ts=4 expandtab syntax=python