summaryrefslogtreecommitdiff
path: root/cloudinit/net/tests/test_init.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/net/tests/test_init.py')
-rw-r--r--cloudinit/net/tests/test_init.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py
index d393e6ad..e6e77d7a 100644
--- a/cloudinit/net/tests/test_init.py
+++ b/cloudinit/net/tests/test_init.py
@@ -708,3 +708,216 @@ class TestHasURLConnectivity(HttprettyTestCase):
httpretty.register_uri(httpretty.GET, self.url, body={}, status=404)
self.assertFalse(
net.has_url_connectivity(self.url), 'Expected False on url fail')
+
+
+def _mk_v1_phys(mac, name, driver, device_id):
+ v1_cfg = {'type': 'physical', 'name': name, 'mac_address': mac}
+ params = {}
+ if driver:
+ params.update({'driver': driver})
+ if device_id:
+ params.update({'device_id': device_id})
+
+ if params:
+ v1_cfg.update({'params': params})
+
+ return v1_cfg
+
+
+def _mk_v2_phys(mac, name, driver=None, device_id=None):
+ v2_cfg = {'set-name': name, 'match': {'macaddress': mac}}
+ if driver:
+ v2_cfg['match'].update({'driver': driver})
+ if device_id:
+ v2_cfg['match'].update({'device_id': device_id})
+
+ return v2_cfg
+
+
+class TestExtractPhysdevs(CiTestCase):
+
+ def setUp(self):
+ super(TestExtractPhysdevs, self).setUp()
+ self.add_patch('cloudinit.net.device_driver', 'm_driver')
+ self.add_patch('cloudinit.net.device_devid', 'm_devid')
+
+ def test_extract_physdevs_looks_up_driver_v1(self):
+ driver = 'virtio'
+ self.m_driver.return_value = driver
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', None, '0x1000'],
+ ]
+ netcfg = {
+ 'version': 1,
+ 'config': [_mk_v1_phys(*args) for args in physdevs],
+ }
+ # insert the driver value for verification
+ physdevs[0][2] = driver
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+ self.m_driver.assert_called_with('eth0')
+
+ def test_extract_physdevs_looks_up_driver_v2(self):
+ driver = 'virtio'
+ self.m_driver.return_value = driver
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', None, '0x1000'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ # insert the driver value for verification
+ physdevs[0][2] = driver
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+ self.m_driver.assert_called_with('eth0')
+
+ def test_extract_physdevs_looks_up_devid_v1(self):
+ devid = '0x1000'
+ self.m_devid.return_value = devid
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', None],
+ ]
+ netcfg = {
+ 'version': 1,
+ 'config': [_mk_v1_phys(*args) for args in physdevs],
+ }
+ # insert the driver value for verification
+ physdevs[0][3] = devid
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+ self.m_devid.assert_called_with('eth0')
+
+ def test_extract_physdevs_looks_up_devid_v2(self):
+ devid = '0x1000'
+ self.m_devid.return_value = devid
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', None],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ # insert the driver value for verification
+ physdevs[0][3] = devid
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+ self.m_devid.assert_called_with('eth0')
+
+ def test_get_v1_type_physical(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ['09:87:65:43:21:10', 'ens0p1', 'mlx4_core', '0:0:1000'],
+ ]
+ netcfg = {
+ 'version': 1,
+ 'config': [_mk_v1_phys(*args) for args in physdevs],
+ }
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+
+ def test_get_v2_type_physical(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ['09:87:65:43:21:10', 'ens0p1', 'mlx4_core', '0:0:1000'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ self.assertEqual(sorted(physdevs),
+ sorted(net.extract_physdevs(netcfg)))
+
+ def test_get_v2_type_physical_skips_if_no_set_name(self):
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {
+ 'ens3': {
+ 'match': {'macaddress': '00:11:22:33:44:55'},
+ }
+ }
+ }
+ self.assertEqual([], net.extract_physdevs(netcfg))
+
+ def test_runtime_error_on_unknown_netcfg_version(self):
+ with self.assertRaises(RuntimeError):
+ net.extract_physdevs({'version': 3, 'awesome_config': []})
+
+
+class TestWaitForPhysdevs(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestWaitForPhysdevs, self).setUp()
+ self.add_patch('cloudinit.net.get_interfaces_by_mac',
+ 'm_get_iface_mac')
+ self.add_patch('cloudinit.util.udevadm_settle', 'm_udev_settle')
+
+ def test_wait_for_physdevs_skips_settle_if_all_present(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args)
+ for args in physdevs},
+ }
+ self.m_get_iface_mac.side_effect = iter([
+ {'aa:bb:cc:dd:ee:ff': 'eth0',
+ '00:11:22:33:44:55': 'ens3'},
+ ])
+ net.wait_for_physdevs(netcfg)
+ self.assertEqual(0, self.m_udev_settle.call_count)
+
+ def test_wait_for_physdevs_calls_udev_settle_on_missing(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args)
+ for args in physdevs},
+ }
+ self.m_get_iface_mac.side_effect = iter([
+ {'aa:bb:cc:dd:ee:ff': 'eth0'}, # first call ens3 is missing
+ {'aa:bb:cc:dd:ee:ff': 'eth0',
+ '00:11:22:33:44:55': 'ens3'}, # second call has both
+ ])
+ net.wait_for_physdevs(netcfg)
+ self.m_udev_settle.assert_called_with(exists=net.sys_dev_path('ens3'))
+
+ def test_wait_for_physdevs_raise_runtime_error_if_missing_and_strict(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args)
+ for args in physdevs},
+ }
+ self.m_get_iface_mac.return_value = {}
+ with self.assertRaises(RuntimeError):
+ net.wait_for_physdevs(netcfg)
+
+ self.assertEqual(5 * len(physdevs), self.m_udev_settle.call_count)
+
+ def test_wait_for_physdevs_no_raise_if_not_strict(self):
+ physdevs = [
+ ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
+ ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
+ ]
+ netcfg = {
+ 'version': 2,
+ 'ethernets': {args[1]: _mk_v2_phys(*args)
+ for args in physdevs},
+ }
+ self.m_get_iface_mac.return_value = {}
+ net.wait_for_physdevs(netcfg, strict=False)
+ self.assertEqual(5 * len(physdevs), self.m_udev_settle.call_count)