summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/net/__init__.py131
-rw-r--r--cloudinit/net/tests/__init__.py0
-rw-r--r--cloudinit/net/tests/test_init.py522
3 files changed, 636 insertions, 17 deletions
diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py
index d1740e56..46cb9c85 100644
--- a/cloudinit/net/__init__.py
+++ b/cloudinit/net/__init__.py
@@ -10,6 +10,7 @@ import logging
import os
import re
+from cloudinit.net.network_state import mask_to_net_prefix
from cloudinit import util
LOG = logging.getLogger(__name__)
@@ -28,8 +29,13 @@ def _natural_sort_key(s, _nsre=re.compile('([0-9]+)')):
for text in re.split(_nsre, s)]
+def get_sys_class_path():
+ """Simple function to return the global SYS_CLASS_NET."""
+ return SYS_CLASS_NET
+
+
def sys_dev_path(devname, path=""):
- return SYS_CLASS_NET + devname + "/" + path
+ return get_sys_class_path() + devname + "/" + path
def read_sys_net(devname, path, translate=None,
@@ -77,7 +83,7 @@ def read_sys_net_int(iface, field):
return None
try:
return int(val)
- except TypeError:
+ except ValueError:
return None
@@ -149,7 +155,14 @@ def device_devid(devname):
def get_devicelist():
- return os.listdir(SYS_CLASS_NET)
+ try:
+ devs = os.listdir(get_sys_class_path())
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ devs = []
+ else:
+ raise
+ return devs
class ParserError(Exception):
@@ -497,14 +510,8 @@ def get_interfaces_by_mac():
"""Build a dictionary of tuples {mac: name}.
Bridges and any devices that have a 'stolen' mac are excluded."""
- try:
- devs = get_devicelist()
- except OSError as e:
- if e.errno == errno.ENOENT:
- devs = []
- else:
- raise
ret = {}
+ devs = get_devicelist()
empty_mac = '00:00:00:00:00:00'
for name in devs:
if not interface_has_own_mac(name):
@@ -531,14 +538,8 @@ def get_interfaces():
"""Return list of interface tuples (name, mac, driver, device_id)
Bridges and any devices that have a 'stolen' mac are excluded."""
- try:
- devs = get_devicelist()
- except OSError as e:
- if e.errno == errno.ENOENT:
- devs = []
- else:
- raise
ret = []
+ devs = get_devicelist()
empty_mac = '00:00:00:00:00:00'
for name in devs:
if not interface_has_own_mac(name):
@@ -557,6 +558,102 @@ def get_interfaces():
return ret
+class EphemeralIPv4Network(object):
+ """Context manager which sets up temporary static network configuration.
+
+ No operations are performed if the provided interface is already connected.
+ If unconnected, bring up the interface with valid ip, prefix and broadcast.
+ If router is provided setup a default route for that interface. Upon
+ context exit, clean up the interface leaving no configuration behind.
+ """
+
+ def __init__(self, interface, ip, prefix_or_mask, broadcast, router=None):
+ """Setup context manager and validate call signature.
+
+ @param interface: Name of the network interface to bring up.
+ @param ip: IP address to assign to the interface.
+ @param prefix_or_mask: Either netmask of the format X.X.X.X or an int
+ prefix.
+ @param broadcast: Broadcast address for the IPv4 network.
+ @param router: Optionally the default gateway IP.
+ """
+ if not all([interface, ip, prefix_or_mask, broadcast]):
+ raise ValueError(
+ 'Cannot init network on {0} with {1}/{2} and bcast {3}'.format(
+ interface, ip, prefix_or_mask, broadcast))
+ try:
+ self.prefix = mask_to_net_prefix(prefix_or_mask)
+ except ValueError as e:
+ raise ValueError(
+ 'Cannot setup network: {0}'.format(e))
+ self.interface = interface
+ self.ip = ip
+ self.broadcast = broadcast
+ self.router = router
+ self.cleanup_cmds = [] # List of commands to run to cleanup state.
+
+ def __enter__(self):
+ """Perform ephemeral network setup if interface is not connected."""
+ self._bringup_device()
+ if self.router:
+ self._bringup_router()
+
+ def __exit__(self, excp_type, excp_value, excp_traceback):
+ for cmd in self.cleanup_cmds:
+ util.subp(cmd, capture=True)
+
+ def _delete_address(self, address, prefix):
+ """Perform the ip command to remove the specified address."""
+ util.subp(
+ ['ip', '-family', 'inet', 'addr', 'del',
+ '%s/%s' % (address, prefix), 'dev', self.interface],
+ capture=True)
+
+ def _bringup_device(self):
+ """Perform the ip comands to fully setup the device."""
+ cidr = '{0}/{1}'.format(self.ip, self.prefix)
+ LOG.debug(
+ 'Attempting setup of ephemeral network on %s with %s brd %s',
+ self.interface, cidr, self.broadcast)
+ try:
+ util.subp(
+ ['ip', '-family', 'inet', 'addr', 'add', cidr, 'broadcast',
+ self.broadcast, 'dev', self.interface],
+ capture=True, update_env={'LANG': 'C'})
+ except util.ProcessExecutionError as e:
+ if "File exists" not in e.stderr:
+ raise
+ LOG.debug(
+ 'Skip ephemeral network setup, %s already has address %s',
+ self.interface, self.ip)
+ else:
+ # Address creation success, bring up device and queue cleanup
+ util.subp(
+ ['ip', '-family', 'inet', 'link', 'set', 'dev', self.interface,
+ 'up'], capture=True)
+ self.cleanup_cmds.append(
+ ['ip', '-family', 'inet', 'link', 'set', 'dev', self.interface,
+ 'down'])
+ self.cleanup_cmds.append(
+ ['ip', '-family', 'inet', 'addr', 'del', cidr, 'dev',
+ self.interface])
+
+ def _bringup_router(self):
+ """Perform the ip commands to fully setup the router if needed."""
+ # Check if a default route exists and exit if it does
+ out, _ = util.subp(['ip', 'route', 'show', '0.0.0.0/0'], capture=True)
+ if 'default' in out:
+ LOG.debug(
+ 'Skip ephemeral route setup. %s already has default route: %s',
+ self.interface, out.strip())
+ return
+ util.subp(
+ ['ip', '-4', 'route', 'add', 'default', 'via', self.router,
+ 'dev', self.interface], capture=True)
+ self.cleanup_cmds.insert(
+ 0, ['ip', '-4', 'route', 'del', 'default', 'dev', self.interface])
+
+
class RendererNotFoundError(RuntimeError):
pass
diff --git a/cloudinit/net/tests/__init__.py b/cloudinit/net/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/cloudinit/net/tests/__init__.py
diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py
new file mode 100644
index 00000000..272a6ebd
--- /dev/null
+++ b/cloudinit/net/tests/test_init.py
@@ -0,0 +1,522 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import errno
+import mock
+import os
+
+import cloudinit.net as net
+from cloudinit.util import ensure_file, write_file, ProcessExecutionError
+from tests.unittests.helpers import CiTestCase
+
+
+class TestSysDevPath(CiTestCase):
+
+ def test_sys_dev_path(self):
+ """sys_dev_path returns a path under SYS_CLASS_NET for a device."""
+ dev = 'something'
+ path = 'attribute'
+ expected = net.SYS_CLASS_NET + dev + '/' + path
+ self.assertEqual(expected, net.sys_dev_path(dev, path))
+
+ def test_sys_dev_path_without_path(self):
+ """When path param isn't provided it defaults to empty string."""
+ dev = 'something'
+ expected = net.SYS_CLASS_NET + dev + '/'
+ self.assertEqual(expected, net.sys_dev_path(dev))
+
+
+class TestReadSysNet(CiTestCase):
+ with_logs = True
+
+ def setUp(self):
+ super(TestReadSysNet, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_read_sys_net_strips_contents_of_sys_path(self):
+ """read_sys_net strips whitespace from the contents of a sys file."""
+ content = 'some stuff with trailing whitespace\t\r\n'
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), content)
+ self.assertEqual(content.strip(), net.read_sys_net('dev', 'attr'))
+
+ def test_read_sys_net_reraises_oserror(self):
+ """read_sys_net raises OSError/IOError when file doesn't exist."""
+ # Non-specific Exception because versions of python OSError vs IOError.
+ with self.assertRaises(Exception) as context_manager: # noqa: H202
+ net.read_sys_net('dev', 'attr')
+ error = context_manager.exception
+ self.assertIn('No such file or directory', str(error))
+
+ def test_read_sys_net_handles_error_with_on_enoent(self):
+ """read_sys_net handles OSError/IOError with on_enoent if provided."""
+ handled_errors = []
+
+ def on_enoent(e):
+ handled_errors.append(e)
+
+ net.read_sys_net('dev', 'attr', on_enoent=on_enoent)
+ error = handled_errors[0]
+ self.assertIsInstance(error, Exception)
+ self.assertIn('No such file or directory', str(error))
+
+ def test_read_sys_net_translates_content(self):
+ """read_sys_net translates content when translate dict is provided."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), content)
+ translate = {"you're welcome": 'de nada'}
+ self.assertEqual(
+ 'de nada',
+ net.read_sys_net('dev', 'attr', translate=translate))
+
+ def test_read_sys_net_errors_on_translation_failures(self):
+ """read_sys_net raises a KeyError and logs details on failure."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), content)
+ with self.assertRaises(KeyError) as context_manager:
+ net.read_sys_net('dev', 'attr', translate={})
+ error = context_manager.exception
+ self.assertEqual('"you\'re welcome"', str(error))
+ self.assertIn(
+ "Found unexpected (not translatable) value 'you're welcome' in "
+ "'{0}dev/attr".format(self.sysdir),
+ self.logs.getvalue())
+
+ def test_read_sys_net_handles_handles_with_onkeyerror(self):
+ """read_sys_net handles translation errors calling on_keyerror."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), content)
+ handled_errors = []
+
+ def on_keyerror(e):
+ handled_errors.append(e)
+
+ net.read_sys_net('dev', 'attr', translate={}, on_keyerror=on_keyerror)
+ error = handled_errors[0]
+ self.assertIsInstance(error, KeyError)
+ self.assertEqual('"you\'re welcome"', str(error))
+
+ def test_read_sys_net_safe_false_on_translate_failure(self):
+ """read_sys_net_safe returns False on translation failures."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), content)
+ self.assertFalse(net.read_sys_net_safe('dev', 'attr', translate={}))
+
+ def test_read_sys_net_safe_returns_false_on_noent_failure(self):
+ """read_sys_net_safe returns False on file not found failures."""
+ self.assertFalse(net.read_sys_net_safe('dev', 'attr'))
+
+ def test_read_sys_net_int_returns_none_on_error(self):
+ """read_sys_net_safe returns None on failures."""
+ self.assertFalse(net.read_sys_net_int('dev', 'attr'))
+
+ def test_read_sys_net_int_returns_none_on_valueerror(self):
+ """read_sys_net_safe returns None when content is not an int."""
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), 'NOTINT\n')
+ self.assertFalse(net.read_sys_net_int('dev', 'attr'))
+
+ def test_read_sys_net_int_returns_integer_from_content(self):
+ """read_sys_net_safe returns None on failures."""
+ write_file(os.path.join(self.sysdir, 'dev', 'attr'), '1\n')
+ self.assertEqual(1, net.read_sys_net_int('dev', 'attr'))
+
+ def test_is_up_true(self):
+ """is_up is True if sys/net/devname/operstate is 'up' or 'unknown'."""
+ for state in ['up', 'unknown']:
+ write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), state)
+ self.assertTrue(net.is_up('eth0'))
+
+ def test_is_up_false(self):
+ """is_up is False if sys/net/devname/operstate is 'down' or invalid."""
+ for state in ['down', 'incomprehensible']:
+ write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), state)
+ self.assertFalse(net.is_up('eth0'))
+
+ def test_is_wireless(self):
+ """is_wireless is True when /sys/net/devname/wireless exists."""
+ self.assertFalse(net.is_wireless('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'wireless'))
+ self.assertTrue(net.is_wireless('eth0'))
+
+ def test_is_bridge(self):
+ """is_bridge is True when /sys/net/devname/bridge exists."""
+ self.assertFalse(net.is_bridge('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'bridge'))
+ self.assertTrue(net.is_bridge('eth0'))
+
+ def test_is_bond(self):
+ """is_bond is True when /sys/net/devname/bonding exists."""
+ self.assertFalse(net.is_bond('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'bonding'))
+ self.assertTrue(net.is_bond('eth0'))
+
+ def test_is_vlan(self):
+ """is_vlan is True when /sys/net/devname/uevent has DEVTYPE=vlan."""
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'uevent'))
+ self.assertFalse(net.is_vlan('eth0'))
+ content = 'junk\nDEVTYPE=vlan\njunk\n'
+ write_file(os.path.join(self.sysdir, 'eth0', 'uevent'), content)
+ self.assertTrue(net.is_vlan('eth0'))
+
+ def test_is_connected_when_physically_connected(self):
+ """is_connected is True when /sys/net/devname/iflink reports 2."""
+ self.assertFalse(net.is_connected('eth0'))
+ write_file(os.path.join(self.sysdir, 'eth0', 'iflink'), "2")
+ self.assertTrue(net.is_connected('eth0'))
+
+ def test_is_connected_when_wireless_and_carrier_active(self):
+ """is_connected is True if wireless /sys/net/devname/carrier is 1."""
+ self.assertFalse(net.is_connected('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'wireless'))
+ self.assertFalse(net.is_connected('eth0'))
+ write_file(os.path.join(self.sysdir, 'eth0', 'carrier'), "1")
+ self.assertTrue(net.is_connected('eth0'))
+
+ def test_is_physical(self):
+ """is_physical is True when /sys/net/devname/device exists."""
+ self.assertFalse(net.is_physical('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'device'))
+ self.assertTrue(net.is_physical('eth0'))
+
+ def test_is_present(self):
+ """is_present is True when /sys/net/devname exists."""
+ self.assertFalse(net.is_present('eth0'))
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'device'))
+ self.assertTrue(net.is_present('eth0'))
+
+
+class TestGenerateFallbackConfig(CiTestCase):
+
+ def setUp(self):
+ super(TestGenerateFallbackConfig, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_generate_fallback_finds_connected_eth_with_mac(self):
+ """generate_fallback_config finds any connected device with a mac."""
+ write_file(os.path.join(self.sysdir, 'eth0', 'carrier'), '1')
+ write_file(os.path.join(self.sysdir, 'eth1', 'carrier'), '1')
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth1', 'address'), mac)
+ expected = {
+ 'config': [{'type': 'physical', 'mac_address': mac,
+ 'name': 'eth1', 'subnets': [{'type': 'dhcp'}]}],
+ 'version': 1}
+ self.assertEqual(expected, net.generate_fallback_config())
+
+ def test_generate_fallback_finds_dormant_eth_with_mac(self):
+ """generate_fallback_config finds any dormant device with a mac."""
+ write_file(os.path.join(self.sysdir, 'eth0', 'dormant'), '1')
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth0', 'address'), mac)
+ expected = {
+ 'config': [{'type': 'physical', 'mac_address': mac,
+ 'name': 'eth0', 'subnets': [{'type': 'dhcp'}]}],
+ 'version': 1}
+ self.assertEqual(expected, net.generate_fallback_config())
+
+ def test_generate_fallback_finds_eth_by_operstate(self):
+ """generate_fallback_config finds any dormant device with a mac."""
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth0', 'address'), mac)
+ expected = {
+ 'config': [{'type': 'physical', 'mac_address': mac,
+ 'name': 'eth0', 'subnets': [{'type': 'dhcp'}]}],
+ 'version': 1}
+ valid_operstates = ['dormant', 'down', 'lowerlayerdown', 'unknown']
+ for state in valid_operstates:
+ write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), state)
+ self.assertEqual(expected, net.generate_fallback_config())
+ write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), 'noworky')
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_veth(self):
+ """generate_fallback_config will skip any veth interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, 'veth0', 'carrier'), '1')
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_bridges(self):
+ """generate_fallback_config will skip any bridges interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, 'eth0', 'carrier'), '1')
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth0', 'address'), mac)
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'bridge'))
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_bonds(self):
+ """generate_fallback_config will skip any bonded interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, 'eth0', 'carrier'), '1')
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth0', 'address'), mac)
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'bonding'))
+ self.assertIsNone(net.generate_fallback_config())
+
+
+class TestGetDeviceList(CiTestCase):
+
+ def setUp(self):
+ super(TestGetDeviceList, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_get_devicelist_raise_oserror(self):
+ """get_devicelist raise any non-ENOENT OSerror."""
+ error = OSError('Can not do it')
+ error.errno = errno.EPERM # Set non-ENOENT
+ self.m_sys_path.side_effect = error
+ with self.assertRaises(OSError) as context_manager:
+ net.get_devicelist()
+ exception = context_manager.exception
+ self.assertEqual('Can not do it', str(exception))
+
+ def test_get_devicelist_empty_without_sys_net(self):
+ """get_devicelist returns empty list when missing SYS_CLASS_NET."""
+ self.m_sys_path.return_value = 'idontexist'
+ self.assertEqual([], net.get_devicelist())
+
+ def test_get_devicelist_empty_with_no_devices_in_sys_net(self):
+ """get_devicelist returns empty directoty listing for SYS_CLASS_NET."""
+ self.assertEqual([], net.get_devicelist())
+
+ def test_get_devicelist_lists_any_subdirectories_in_sys_net(self):
+ """get_devicelist returns a directory listing for SYS_CLASS_NET."""
+ write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), 'up')
+ write_file(os.path.join(self.sysdir, 'eth1', 'operstate'), 'up')
+ self.assertItemsEqual(['eth0', 'eth1'], net.get_devicelist())
+
+
+class TestGetInterfaceMAC(CiTestCase):
+
+ def setUp(self):
+ super(TestGetInterfaceMAC, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_get_interface_mac_false_with_no_mac(self):
+ """get_device_list returns False when no mac is reported."""
+ ensure_file(os.path.join(self.sysdir, 'eth0', 'bonding'))
+ mac_path = os.path.join(self.sysdir, 'eth0', 'address')
+ self.assertFalse(os.path.exists(mac_path))
+ self.assertFalse(net.get_interface_mac('eth0'))
+
+ def test_get_interface_mac(self):
+ """get_interfaces returns the mac from SYS_CLASS_NET/dev/address."""
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth1', 'address'), mac)
+ self.assertEqual(mac, net.get_interface_mac('eth1'))
+
+ def test_get_interface_mac_grabs_bonding_address(self):
+ """get_interfaces returns the source device mac for bonded devices."""
+ source_dev_mac = 'aa:bb:cc:aa:bb:cc'
+ bonded_mac = 'dd:ee:ff:dd:ee:ff'
+ write_file(os.path.join(self.sysdir, 'eth1', 'address'), bonded_mac)
+ write_file(
+ os.path.join(self.sysdir, 'eth1', 'bonding_slave', 'perm_hwaddr'),
+ source_dev_mac)
+ self.assertEqual(source_dev_mac, net.get_interface_mac('eth1'))
+
+ def test_get_interfaces_empty_list_without_sys_net(self):
+ """get_interfaces returns an empty list when missing SYS_CLASS_NET."""
+ self.m_sys_path.return_value = 'idontexist'
+ self.assertEqual([], net.get_interfaces())
+
+ def test_get_interfaces_by_mac_skips_empty_mac(self):
+ """Ignore 00:00:00:00:00:00 addresses from get_interfaces_by_mac."""
+ empty_mac = '00:00:00:00:00:00'
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth1', 'address'), empty_mac)
+ write_file(os.path.join(self.sysdir, 'eth1', 'addr_assign_type'), '0')
+ write_file(os.path.join(self.sysdir, 'eth2', 'addr_assign_type'), '0')
+ write_file(os.path.join(self.sysdir, 'eth2', 'address'), mac)
+ expected = [('eth2', 'aa:bb:cc:aa:bb:cc', None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+ def test_get_interfaces_by_mac_skips_missing_mac(self):
+ """Ignore interfaces without an address from get_interfaces_by_mac."""
+ write_file(os.path.join(self.sysdir, 'eth1', 'addr_assign_type'), '0')
+ address_path = os.path.join(self.sysdir, 'eth1', 'address')
+ self.assertFalse(os.path.exists(address_path))
+ mac = 'aa:bb:cc:aa:bb:cc'
+ write_file(os.path.join(self.sysdir, 'eth2', 'addr_assign_type'), '0')
+ write_file(os.path.join(self.sysdir, 'eth2', 'address'), mac)
+ expected = [('eth2', 'aa:bb:cc:aa:bb:cc', None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+
+class TestInterfaceHasOwnMAC(CiTestCase):
+
+ def setUp(self):
+ super(TestInterfaceHasOwnMAC, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_interface_has_own_mac_false_when_stolen(self):
+ """Return False from interface_has_own_mac when address is stolen."""
+ write_file(os.path.join(self.sysdir, 'eth1', 'addr_assign_type'), '2')
+ self.assertFalse(net.interface_has_own_mac('eth1'))
+
+ def test_interface_has_own_mac_true_when_not_stolen(self):
+ """Return False from interface_has_own_mac when mac isn't stolen."""
+ valid_assign_types = ['0', '1', '3']
+ assign_path = os.path.join(self.sysdir, 'eth1', 'addr_assign_type')
+ for _type in valid_assign_types:
+ write_file(assign_path, _type)
+ self.assertTrue(net.interface_has_own_mac('eth1'))
+
+ def test_interface_has_own_mac_strict_errors_on_absent_assign_type(self):
+ """When addr_assign_type is absent, interface_has_own_mac errors."""
+ with self.assertRaises(ValueError):
+ net.interface_has_own_mac('eth1', strict=True)
+
+
+@mock.patch('cloudinit.net.util.subp')
+class TestEphemeralIPV4Network(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestEphemeralIPV4Network, self).setUp()
+ sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + '/'
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_ephemeral_ipv4_network_errors_on_missing_params(self, m_subp):
+ """No required params for EphemeralIPv4Network can be None."""
+ required_params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'prefix_or_mask': '255.255.255.0', 'broadcast': '192.168.2.255'}
+ for key in required_params.keys():
+ params = copy.deepcopy(required_params)
+ params[key] = None
+ with self.assertRaises(ValueError) as context_manager:
+ net.EphemeralIPv4Network(**params)
+ error = context_manager.exception
+ self.assertIn('Cannot init network on', str(error))
+ self.assertEqual(0, m_subp.call_count)
+
+ def test_ephemeral_ipv4_network_errors_invalid_mask(self, m_subp):
+ """Raise an error when prefix_or_mask is not a netmask or prefix."""
+ params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'broadcast': '192.168.2.255'}
+ invalid_masks = ('invalid', 'invalid.', '123.123.123')
+ for error_val in invalid_masks:
+ params['prefix_or_mask'] = error_val
+ with self.assertRaises(ValueError) as context_manager:
+ with net.EphemeralIPv4Network(**params):
+ pass
+ error = context_manager.exception
+ self.assertIn('Cannot setup network: netmask', str(error))
+ self.assertEqual(0, m_subp.call_count)
+
+ def test_ephemeral_ipv4_network_performs_teardown(self, m_subp):
+ """EphemeralIPv4Network performs teardown on the device if setup."""
+ expected_setup_calls = [
+ mock.call(
+ ['ip', '-family', 'inet', 'addr', 'add', '192.168.2.2/24',
+ 'broadcast', '192.168.2.255', 'dev', 'eth0'],
+ capture=True, update_env={'LANG': 'C'}),
+ mock.call(
+ ['ip', '-family', 'inet', 'link', 'set', 'dev', 'eth0', 'up'],
+ capture=True)]
+ expected_teardown_calls = [
+ mock.call(
+ ['ip', '-family', 'inet', 'link', 'set', 'dev', 'eth0',
+ 'down'], capture=True),
+ mock.call(
+ ['ip', '-family', 'inet', 'addr', 'del', '192.168.2.2/24',
+ 'dev', 'eth0'], capture=True)]
+ params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'prefix_or_mask': '255.255.255.0', 'broadcast': '192.168.2.255'}
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(expected_setup_calls, m_subp.call_args_list)
+ m_subp.assert_has_calls(expected_teardown_calls)
+
+ def test_ephemeral_ipv4_network_noop_when_configured(self, m_subp):
+ """EphemeralIPv4Network handles exception when address is setup.
+
+ It performs no cleanup as the interface was already setup.
+ """
+ params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'prefix_or_mask': '255.255.255.0', 'broadcast': '192.168.2.255'}
+ m_subp.side_effect = ProcessExecutionError(
+ '', 'RTNETLINK answers: File exists', 2)
+ expected_calls = [
+ mock.call(
+ ['ip', '-family', 'inet', 'addr', 'add', '192.168.2.2/24',
+ 'broadcast', '192.168.2.255', 'dev', 'eth0'],
+ capture=True, update_env={'LANG': 'C'})]
+ with net.EphemeralIPv4Network(**params):
+ pass
+ self.assertEqual(expected_calls, m_subp.call_args_list)
+ self.assertIn(
+ 'Skip ephemeral network setup, eth0 already has address',
+ self.logs.getvalue())
+
+ def test_ephemeral_ipv4_network_with_prefix(self, m_subp):
+ """EphemeralIPv4Network takes a valid prefix to setup the network."""
+ params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'prefix_or_mask': '24', 'broadcast': '192.168.2.255'}
+ for prefix_val in ['24', 16]: # prefix can be int or string
+ params['prefix_or_mask'] = prefix_val
+ with net.EphemeralIPv4Network(**params):
+ pass
+ m_subp.assert_has_calls([mock.call(
+ ['ip', '-family', 'inet', 'addr', 'add', '192.168.2.2/24',
+ 'broadcast', '192.168.2.255', 'dev', 'eth0'],
+ capture=True, update_env={'LANG': 'C'})])
+ m_subp.assert_has_calls([mock.call(
+ ['ip', '-family', 'inet', 'addr', 'add', '192.168.2.2/16',
+ 'broadcast', '192.168.2.255', 'dev', 'eth0'],
+ capture=True, update_env={'LANG': 'C'})])
+
+ def test_ephemeral_ipv4_network_with_new_default_route(self, m_subp):
+ """Add the route when router is set and no default route exists."""
+ params = {
+ 'interface': 'eth0', 'ip': '192.168.2.2',
+ 'prefix_or_mask': '255.255.255.0', 'broadcast': '192.168.2.255',
+ 'router': '192.168.2.1'}
+ m_subp.return_value = '', '' # Empty response from ip route gw check
+ expected_setup_calls = [
+ mock.call(
+ ['ip', '-family', 'inet', 'addr', 'add', '192.168.2.2/24',
+ 'broadcast', '192.168.2.255', 'dev', 'eth0'],
+ capture=True, update_env={'LANG': 'C'}),
+ mock.call(
+ ['ip', '-family', 'inet', 'link', 'set', 'dev', 'eth0', 'up'],
+ capture=True),
+ mock.call(
+ ['ip', 'route', 'show', '0.0.0.0/0'], capture=True),
+ mock.call(
+ ['ip', '-4', 'route', 'add', 'default', 'via',
+ '192.168.2.1', 'dev', 'eth0'], capture=True)]
+ expected_teardown_calls = [mock.call(
+ ['ip', '-4', 'route', 'del', 'default', 'dev', 'eth0'],
+ capture=True)]
+
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(expected_setup_calls, m_subp.call_args_list)
+ m_subp.assert_has_calls(expected_teardown_calls)