summaryrefslogtreecommitdiff
path: root/tests/unittests/net
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/net')
-rw-r--r--tests/unittests/net/__init__.py0
-rw-r--r--tests/unittests/net/test_dhcp.py797
-rw-r--r--tests/unittests/net/test_init.py1734
-rw-r--r--tests/unittests/net/test_network_state.py222
-rw-r--r--tests/unittests/net/test_networkd.py64
5 files changed, 2817 insertions, 0 deletions
diff --git a/tests/unittests/net/__init__.py b/tests/unittests/net/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/net/__init__.py
diff --git a/tests/unittests/net/test_dhcp.py b/tests/unittests/net/test_dhcp.py
new file mode 100644
index 00000000..876873d5
--- /dev/null
+++ b/tests/unittests/net/test_dhcp.py
@@ -0,0 +1,797 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+import signal
+from textwrap import dedent
+
+import httpretty
+
+import cloudinit.net as net
+from cloudinit.net.dhcp import (
+ InvalidDHCPLeaseFileError,
+ dhcp_discovery,
+ maybe_perform_dhcp_discovery,
+ networkd_load_leases,
+ parse_dhcp_lease_file,
+ parse_static_routes,
+)
+from cloudinit.util import ensure_file, write_file
+from tests.unittests.helpers import (
+ CiTestCase,
+ HttprettyTestCase,
+ mock,
+ populate_dir,
+ wrap_and_call,
+)
+
+
+class TestParseDHCPLeasesFile(CiTestCase):
+ def test_parse_empty_lease_file_errors(self):
+ """parse_dhcp_lease_file errors when file content is empty."""
+ empty_file = self.tmp_path("leases")
+ ensure_file(empty_file)
+ with self.assertRaises(InvalidDHCPLeaseFileError) as context_manager:
+ parse_dhcp_lease_file(empty_file)
+ error = context_manager.exception
+ self.assertIn("Cannot parse empty dhcp lease file", str(error))
+
+ def test_parse_malformed_lease_file_content_errors(self):
+ """parse_dhcp_lease_file errors when file content isn't dhcp leases."""
+ non_lease_file = self.tmp_path("leases")
+ write_file(non_lease_file, "hi mom.")
+ with self.assertRaises(InvalidDHCPLeaseFileError) as context_manager:
+ parse_dhcp_lease_file(non_lease_file)
+ error = context_manager.exception
+ self.assertIn("Cannot parse dhcp lease file", str(error))
+
+ def test_parse_multiple_leases(self):
+ """parse_dhcp_lease_file returns a list of all leases within."""
+ lease_file = self.tmp_path("leases")
+ content = dedent(
+ """
+ lease {
+ interface "wlp3s0";
+ fixed-address 192.168.2.74;
+ filename "http://192.168.2.50/boot.php?mac=${netX}";
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ renew 4 2017/07/27 18:02:30;
+ expire 5 2017/07/28 07:08:15;
+ }
+ lease {
+ interface "wlp3s0";
+ fixed-address 192.168.2.74;
+ filename "http://192.168.2.50/boot.php?mac=${netX}";
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """
+ )
+ expected = [
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ "renew": "4 2017/07/27 18:02:30",
+ "expire": "5 2017/07/28 07:08:15",
+ "filename": "http://192.168.2.50/boot.php?mac=${netX}",
+ },
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "filename": "http://192.168.2.50/boot.php?mac=${netX}",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ },
+ ]
+ write_file(lease_file, content)
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
+
+
+class TestDHCPRFC3442(CiTestCase):
+ def test_parse_lease_finds_rfc3442_classless_static_routes(self):
+ """parse_dhcp_lease_file returns rfc3442-classless-static-routes."""
+ lease_file = self.tmp_path("leases")
+ content = dedent(
+ """
+ lease {
+ interface "wlp3s0";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ option rfc3442-classless-static-routes 0,130,56,240,1;
+ renew 4 2017/07/27 18:02:30;
+ expire 5 2017/07/28 07:08:15;
+ }
+ """
+ )
+ expected = [
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ "rfc3442-classless-static-routes": "0,130,56,240,1",
+ "renew": "4 2017/07/27 18:02:30",
+ "expire": "5 2017/07/28 07:08:15",
+ }
+ ]
+ write_file(lease_file, content)
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
+
+ def test_parse_lease_finds_classless_static_routes(self):
+ """
+ parse_dhcp_lease_file returns classless-static-routes
+ for Centos lease format.
+ """
+ lease_file = self.tmp_path("leases")
+ content = dedent(
+ """
+ lease {
+ interface "wlp3s0";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ option classless-static-routes 0 130.56.240.1;
+ renew 4 2017/07/27 18:02:30;
+ expire 5 2017/07/28 07:08:15;
+ }
+ """
+ )
+ expected = [
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ "classless-static-routes": "0 130.56.240.1",
+ "renew": "4 2017/07/27 18:02:30",
+ "expire": "5 2017/07/28 07:08:15",
+ }
+ ]
+ write_file(lease_file, content)
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
+
+ @mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network")
+ @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+ def test_obtain_lease_parses_static_routes(self, m_maybe, m_ipv4):
+ """EphemeralDHPCv4 parses rfc3442 routes for EphemeralIPv4Network"""
+ lease = [
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ "rfc3442-classless-static-routes": "0,130,56,240,1",
+ "renew": "4 2017/07/27 18:02:30",
+ "expire": "5 2017/07/28 07:08:15",
+ }
+ ]
+ m_maybe.return_value = lease
+ eph = net.dhcp.EphemeralDHCPv4()
+ eph.obtain_lease()
+ expected_kwargs = {
+ "interface": "wlp3s0",
+ "ip": "192.168.2.74",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ "static_routes": [("0.0.0.0/0", "130.56.240.1")],
+ "router": "192.168.2.1",
+ }
+ m_ipv4.assert_called_with(**expected_kwargs)
+
+ @mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network")
+ @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+ def test_obtain_centos_lease_parses_static_routes(self, m_maybe, m_ipv4):
+ """
+ EphemeralDHPCv4 parses rfc3442 routes for EphemeralIPv4Network
+ for Centos Lease format
+ """
+ lease = [
+ {
+ "interface": "wlp3s0",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ "classless-static-routes": "0 130.56.240.1",
+ "renew": "4 2017/07/27 18:02:30",
+ "expire": "5 2017/07/28 07:08:15",
+ }
+ ]
+ m_maybe.return_value = lease
+ eph = net.dhcp.EphemeralDHCPv4()
+ eph.obtain_lease()
+ expected_kwargs = {
+ "interface": "wlp3s0",
+ "ip": "192.168.2.74",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ "static_routes": [("0.0.0.0/0", "130.56.240.1")],
+ "router": "192.168.2.1",
+ }
+ m_ipv4.assert_called_with(**expected_kwargs)
+
+
+class TestDHCPParseStaticRoutes(CiTestCase):
+
+ with_logs = True
+
+ def parse_static_routes_empty_string(self):
+ self.assertEqual([], parse_static_routes(""))
+
+ def test_parse_static_routes_invalid_input_returns_empty_list(self):
+ rfc3442 = "32,169,254,169,254,130,56,248"
+ self.assertEqual([], parse_static_routes(rfc3442))
+
+ def test_parse_static_routes_bogus_width_returns_empty_list(self):
+ rfc3442 = "33,169,254,169,254,130,56,248"
+ self.assertEqual([], parse_static_routes(rfc3442))
+
+ def test_parse_static_routes_single_ip(self):
+ rfc3442 = "32,169,254,169,254,130,56,248,255"
+ self.assertEqual(
+ [("169.254.169.254/32", "130.56.248.255")],
+ parse_static_routes(rfc3442),
+ )
+
+ def test_parse_static_routes_single_ip_handles_trailing_semicolon(self):
+ rfc3442 = "32,169,254,169,254,130,56,248,255;"
+ self.assertEqual(
+ [("169.254.169.254/32", "130.56.248.255")],
+ parse_static_routes(rfc3442),
+ )
+
+ def test_parse_static_routes_default_route(self):
+ rfc3442 = "0,130,56,240,1"
+ self.assertEqual(
+ [("0.0.0.0/0", "130.56.240.1")], parse_static_routes(rfc3442)
+ )
+
+ def test_unspecified_gateway(self):
+ rfc3442 = "32,169,254,169,254,0,0,0,0"
+ self.assertEqual(
+ [("169.254.169.254/32", "0.0.0.0")], parse_static_routes(rfc3442)
+ )
+
+ def test_parse_static_routes_class_c_b_a(self):
+ class_c = "24,192,168,74,192,168,0,4"
+ class_b = "16,172,16,172,16,0,4"
+ class_a = "8,10,10,0,0,4"
+ rfc3442 = ",".join([class_c, class_b, class_a])
+ self.assertEqual(
+ sorted(
+ [
+ ("192.168.74.0/24", "192.168.0.4"),
+ ("172.16.0.0/16", "172.16.0.4"),
+ ("10.0.0.0/8", "10.0.0.4"),
+ ]
+ ),
+ sorted(parse_static_routes(rfc3442)),
+ )
+
+ def test_parse_static_routes_logs_error_truncated(self):
+ bad_rfc3442 = {
+ "class_c": "24,169,254,169,10",
+ "class_b": "16,172,16,10",
+ "class_a": "8,10,10",
+ "gateway": "0,0",
+ "netlen": "33,0",
+ }
+ for rfc3442 in bad_rfc3442.values():
+ self.assertEqual([], parse_static_routes(rfc3442))
+
+ logs = self.logs.getvalue()
+ self.assertEqual(len(bad_rfc3442.keys()), len(logs.splitlines()))
+
+ def test_parse_static_routes_returns_valid_routes_until_parse_err(self):
+ class_c = "24,192,168,74,192,168,0,4"
+ class_b = "16,172,16,172,16,0,4"
+ class_a_error = "8,10,10,0,0"
+ rfc3442 = ",".join([class_c, class_b, class_a_error])
+ self.assertEqual(
+ sorted(
+ [
+ ("192.168.74.0/24", "192.168.0.4"),
+ ("172.16.0.0/16", "172.16.0.4"),
+ ]
+ ),
+ sorted(parse_static_routes(rfc3442)),
+ )
+
+ logs = self.logs.getvalue()
+ self.assertIn(rfc3442, logs.splitlines()[0])
+
+ def test_redhat_format(self):
+ redhat_format = "24.191.168.128 192.168.128.1,0 192.168.128.1"
+ self.assertEqual(
+ sorted(
+ [
+ ("191.168.128.0/24", "192.168.128.1"),
+ ("0.0.0.0/0", "192.168.128.1"),
+ ]
+ ),
+ sorted(parse_static_routes(redhat_format)),
+ )
+
+ def test_redhat_format_with_a_space_too_much_after_comma(self):
+ redhat_format = "24.191.168.128 192.168.128.1, 0 192.168.128.1"
+ self.assertEqual(
+ sorted(
+ [
+ ("191.168.128.0/24", "192.168.128.1"),
+ ("0.0.0.0/0", "192.168.128.1"),
+ ]
+ ),
+ sorted(parse_static_routes(redhat_format)),
+ )
+
+
+class TestDHCPDiscoveryClean(CiTestCase):
+ with_logs = True
+
+ @mock.patch("cloudinit.net.dhcp.find_fallback_nic")
+ def test_no_fallback_nic_found(self, m_fallback_nic):
+ """Log and do nothing when nic is absent and no fallback is found."""
+ m_fallback_nic.return_value = None # No fallback nic found
+ self.assertEqual([], maybe_perform_dhcp_discovery())
+ self.assertIn(
+ "Skip dhcp_discovery: Unable to find fallback nic.",
+ self.logs.getvalue(),
+ )
+
+ def test_provided_nic_does_not_exist(self):
+ """When the provided nic doesn't exist, log a message and no-op."""
+ self.assertEqual([], maybe_perform_dhcp_discovery("idontexist"))
+ self.assertIn(
+ "Skip dhcp_discovery: nic idontexist not found in get_devicelist.",
+ self.logs.getvalue(),
+ )
+
+ @mock.patch("cloudinit.net.dhcp.subp.which")
+ @mock.patch("cloudinit.net.dhcp.find_fallback_nic")
+ def test_absent_dhclient_command(self, m_fallback, m_which):
+ """When dhclient doesn't exist in the OS, log the issue and no-op."""
+ m_fallback.return_value = "eth9"
+ m_which.return_value = None # dhclient isn't found
+ self.assertEqual([], maybe_perform_dhcp_discovery())
+ self.assertIn(
+ "Skip dhclient configuration: No dhclient command found.",
+ self.logs.getvalue(),
+ )
+
+ @mock.patch("cloudinit.temp_utils.os.getuid")
+ @mock.patch("cloudinit.net.dhcp.dhcp_discovery")
+ @mock.patch("cloudinit.net.dhcp.subp.which")
+ @mock.patch("cloudinit.net.dhcp.find_fallback_nic")
+ def test_dhclient_run_with_tmpdir(self, m_fback, m_which, m_dhcp, m_uid):
+ """maybe_perform_dhcp_discovery passes tmpdir to dhcp_discovery."""
+ m_uid.return_value = 0 # Fake root user for tmpdir
+ m_fback.return_value = "eth9"
+ m_which.return_value = "/sbin/dhclient"
+ m_dhcp.return_value = {"address": "192.168.2.2"}
+ retval = wrap_and_call(
+ "cloudinit.temp_utils",
+ {"_TMPDIR": {"new": None}, "os.getuid": 0},
+ maybe_perform_dhcp_discovery,
+ )
+ self.assertEqual({"address": "192.168.2.2"}, retval)
+ self.assertEqual(
+ 1, m_dhcp.call_count, "dhcp_discovery not called once"
+ )
+ call = m_dhcp.call_args_list[0]
+ self.assertEqual("/sbin/dhclient", call[0][0])
+ self.assertEqual("eth9", call[0][1])
+ self.assertIn("/var/tmp/cloud-init/cloud-init-dhcp-", call[0][2])
+
+ @mock.patch("time.sleep", mock.MagicMock())
+ @mock.patch("cloudinit.net.dhcp.os.kill")
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ def test_dhcp_discovery_run_in_sandbox_warns_invalid_pid(
+ self, m_subp, m_kill
+ ):
+ """dhcp_discovery logs a warning when pidfile contains invalid content.
+
+ Lease processing still occurs and no proc kill is attempted.
+ """
+ m_subp.return_value = ("", "")
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, "dhclient.orig")
+ script_content = "#!/bin/bash\necho fake-dhclient"
+ write_file(dhclient_script, script_content, mode=0o755)
+ write_file(self.tmp_path("dhclient.pid", tmpdir), "") # Empty pid ''
+ lease_content = dedent(
+ """
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """
+ )
+ write_file(self.tmp_path("dhcp.leases", tmpdir), lease_content)
+
+ self.assertCountEqual(
+ [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ }
+ ],
+ dhcp_discovery(dhclient_script, "eth9", tmpdir),
+ )
+ self.assertIn(
+ "dhclient(pid=, parentpid=unknown) failed "
+ "to daemonize after 10.0 seconds",
+ self.logs.getvalue(),
+ )
+ m_kill.assert_not_called()
+
+ @mock.patch("cloudinit.net.dhcp.util.get_proc_ppid")
+ @mock.patch("cloudinit.net.dhcp.os.kill")
+ @mock.patch("cloudinit.net.dhcp.util.wait_for_files")
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ def test_dhcp_discovery_run_in_sandbox_waits_on_lease_and_pid(
+ self, m_subp, m_wait, m_kill, m_getppid
+ ):
+ """dhcp_discovery waits for the presence of pidfile and dhcp.leases."""
+ m_subp.return_value = ("", "")
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, "dhclient.orig")
+ script_content = "#!/bin/bash\necho fake-dhclient"
+ write_file(dhclient_script, script_content, mode=0o755)
+ # Don't create pid or leases file
+ pidfile = self.tmp_path("dhclient.pid", tmpdir)
+ leasefile = self.tmp_path("dhcp.leases", tmpdir)
+ m_wait.return_value = [pidfile] # Return the missing pidfile wait for
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+ self.assertEqual([], dhcp_discovery(dhclient_script, "eth9", tmpdir))
+ self.assertEqual(
+ mock.call([pidfile, leasefile], maxwait=5, naplen=0.01),
+ m_wait.call_args_list[0],
+ )
+ self.assertIn(
+ "WARNING: dhclient did not produce expected files: dhclient.pid",
+ self.logs.getvalue(),
+ )
+ m_kill.assert_not_called()
+
+ @mock.patch("cloudinit.net.dhcp.util.get_proc_ppid")
+ @mock.patch("cloudinit.net.dhcp.os.kill")
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ def test_dhcp_discovery_run_in_sandbox(self, m_subp, m_kill, m_getppid):
+ """dhcp_discovery brings up the interface and runs dhclient.
+
+ It also returns the parsed dhcp.leases file generated in the sandbox.
+ """
+ m_subp.return_value = ("", "")
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, "dhclient.orig")
+ script_content = "#!/bin/bash\necho fake-dhclient"
+ write_file(dhclient_script, script_content, mode=0o755)
+ lease_content = dedent(
+ """
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """
+ )
+ lease_file = os.path.join(tmpdir, "dhcp.leases")
+ write_file(lease_file, lease_content)
+ pid_file = os.path.join(tmpdir, "dhclient.pid")
+ my_pid = 1
+ write_file(pid_file, "%d\n" % my_pid)
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+
+ self.assertCountEqual(
+ [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ }
+ ],
+ dhcp_discovery(dhclient_script, "eth9", tmpdir),
+ )
+ # dhclient script got copied
+ with open(os.path.join(tmpdir, "dhclient")) as stream:
+ self.assertEqual(script_content, stream.read())
+ # Interface was brought up before dhclient called from sandbox
+ m_subp.assert_has_calls(
+ [
+ mock.call(
+ ["ip", "link", "set", "dev", "eth9", "up"], capture=True
+ ),
+ mock.call(
+ [
+ os.path.join(tmpdir, "dhclient"),
+ "-1",
+ "-v",
+ "-lf",
+ lease_file,
+ "-pf",
+ os.path.join(tmpdir, "dhclient.pid"),
+ "eth9",
+ "-sf",
+ "/bin/true",
+ ],
+ capture=True,
+ ),
+ ]
+ )
+ m_kill.assert_has_calls([mock.call(my_pid, signal.SIGKILL)])
+
+ @mock.patch("cloudinit.net.dhcp.util.get_proc_ppid")
+ @mock.patch("cloudinit.net.dhcp.os.kill")
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ def test_dhcp_discovery_outside_sandbox(self, m_subp, m_kill, m_getppid):
+ """dhcp_discovery brings up the interface and runs dhclient.
+
+ It also returns the parsed dhcp.leases file generated in the sandbox.
+ """
+ m_subp.return_value = ("", "")
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, "dhclient.orig")
+ script_content = "#!/bin/bash\necho fake-dhclient"
+ write_file(dhclient_script, script_content, mode=0o755)
+ lease_content = dedent(
+ """
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """
+ )
+ lease_file = os.path.join(tmpdir, "dhcp.leases")
+ write_file(lease_file, lease_content)
+ pid_file = os.path.join(tmpdir, "dhclient.pid")
+ my_pid = 1
+ write_file(pid_file, "%d\n" % my_pid)
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+
+ with mock.patch("os.access", return_value=False):
+ self.assertCountEqual(
+ [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.74",
+ "subnet-mask": "255.255.255.0",
+ "routers": "192.168.2.1",
+ }
+ ],
+ dhcp_discovery(dhclient_script, "eth9", tmpdir),
+ )
+ # dhclient script got copied
+ with open(os.path.join(tmpdir, "dhclient.orig")) as stream:
+ self.assertEqual(script_content, stream.read())
+ # Interface was brought up before dhclient called from sandbox
+ m_subp.assert_has_calls(
+ [
+ mock.call(
+ ["ip", "link", "set", "dev", "eth9", "up"], capture=True
+ ),
+ mock.call(
+ [
+ os.path.join(tmpdir, "dhclient.orig"),
+ "-1",
+ "-v",
+ "-lf",
+ lease_file,
+ "-pf",
+ os.path.join(tmpdir, "dhclient.pid"),
+ "eth9",
+ "-sf",
+ "/bin/true",
+ ],
+ capture=True,
+ ),
+ ]
+ )
+ m_kill.assert_has_calls([mock.call(my_pid, signal.SIGKILL)])
+
+ @mock.patch("cloudinit.net.dhcp.util.get_proc_ppid")
+ @mock.patch("cloudinit.net.dhcp.os.kill")
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ def test_dhcp_output_error_stream(self, m_subp, m_kill, m_getppid):
+ """ "dhcp_log_func is called with the output and error streams of
+ dhclinet when the callable is passed."""
+ dhclient_err = "FAKE DHCLIENT ERROR"
+ dhclient_out = "FAKE DHCLIENT OUT"
+ m_subp.return_value = (dhclient_out, dhclient_err)
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, "dhclient.orig")
+ script_content = "#!/bin/bash\necho fake-dhclient"
+ write_file(dhclient_script, script_content, mode=0o755)
+ lease_content = dedent(
+ """
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """
+ )
+ lease_file = os.path.join(tmpdir, "dhcp.leases")
+ write_file(lease_file, lease_content)
+ pid_file = os.path.join(tmpdir, "dhclient.pid")
+ my_pid = 1
+ write_file(pid_file, "%d\n" % my_pid)
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+
+ def dhcp_log_func(out, err):
+ self.assertEqual(out, dhclient_out)
+ self.assertEqual(err, dhclient_err)
+
+ dhcp_discovery(
+ dhclient_script, "eth9", tmpdir, dhcp_log_func=dhcp_log_func
+ )
+
+
+class TestSystemdParseLeases(CiTestCase):
+
+ lxd_lease = dedent(
+ """\
+ # This is private data. Do not parse.
+ ADDRESS=10.75.205.242
+ NETMASK=255.255.255.0
+ ROUTER=10.75.205.1
+ SERVER_ADDRESS=10.75.205.1
+ NEXT_SERVER=10.75.205.1
+ BROADCAST=10.75.205.255
+ T1=1580
+ T2=2930
+ LIFETIME=3600
+ DNS=10.75.205.1
+ DOMAINNAME=lxd
+ HOSTNAME=a1
+ CLIENTID=ffe617693400020000ab110c65a6a0866931c2
+ """
+ )
+
+ lxd_parsed = {
+ "ADDRESS": "10.75.205.242",
+ "NETMASK": "255.255.255.0",
+ "ROUTER": "10.75.205.1",
+ "SERVER_ADDRESS": "10.75.205.1",
+ "NEXT_SERVER": "10.75.205.1",
+ "BROADCAST": "10.75.205.255",
+ "T1": "1580",
+ "T2": "2930",
+ "LIFETIME": "3600",
+ "DNS": "10.75.205.1",
+ "DOMAINNAME": "lxd",
+ "HOSTNAME": "a1",
+ "CLIENTID": "ffe617693400020000ab110c65a6a0866931c2",
+ }
+
+ azure_lease = dedent(
+ """\
+ # This is private data. Do not parse.
+ ADDRESS=10.132.0.5
+ NETMASK=255.255.255.255
+ ROUTER=10.132.0.1
+ SERVER_ADDRESS=169.254.169.254
+ NEXT_SERVER=10.132.0.1
+ MTU=1460
+ T1=43200
+ T2=75600
+ LIFETIME=86400
+ DNS=169.254.169.254
+ NTP=169.254.169.254
+ DOMAINNAME=c.ubuntu-foundations.internal
+ DOMAIN_SEARCH_LIST=c.ubuntu-foundations.internal google.internal
+ HOSTNAME=tribaal-test-171002-1349.c.ubuntu-foundations.internal
+ ROUTES=10.132.0.1/32,0.0.0.0 0.0.0.0/0,10.132.0.1
+ CLIENTID=ff405663a200020000ab11332859494d7a8b4c
+ OPTION_245=624c3620
+ """
+ )
+
+ azure_parsed = {
+ "ADDRESS": "10.132.0.5",
+ "NETMASK": "255.255.255.255",
+ "ROUTER": "10.132.0.1",
+ "SERVER_ADDRESS": "169.254.169.254",
+ "NEXT_SERVER": "10.132.0.1",
+ "MTU": "1460",
+ "T1": "43200",
+ "T2": "75600",
+ "LIFETIME": "86400",
+ "DNS": "169.254.169.254",
+ "NTP": "169.254.169.254",
+ "DOMAINNAME": "c.ubuntu-foundations.internal",
+ "DOMAIN_SEARCH_LIST": "c.ubuntu-foundations.internal google.internal",
+ "HOSTNAME": "tribaal-test-171002-1349.c.ubuntu-foundations.internal",
+ "ROUTES": "10.132.0.1/32,0.0.0.0 0.0.0.0/0,10.132.0.1",
+ "CLIENTID": "ff405663a200020000ab11332859494d7a8b4c",
+ "OPTION_245": "624c3620",
+ }
+
+ def setUp(self):
+ super(TestSystemdParseLeases, self).setUp()
+ self.lease_d = self.tmp_dir()
+
+ def test_no_leases_returns_empty_dict(self):
+ """A leases dir with no lease files should return empty dictionary."""
+ self.assertEqual({}, networkd_load_leases(self.lease_d))
+
+ def test_no_leases_dir_returns_empty_dict(self):
+ """A non-existing leases dir should return empty dict."""
+ enodir = os.path.join(self.lease_d, "does-not-exist")
+ self.assertEqual({}, networkd_load_leases(enodir))
+
+ def test_single_leases_file(self):
+ """A leases dir with one leases file."""
+ populate_dir(self.lease_d, {"2": self.lxd_lease})
+ self.assertEqual(
+ {"2": self.lxd_parsed}, networkd_load_leases(self.lease_d)
+ )
+
+ def test_single_azure_leases_file(self):
+ """On Azure, option 245 should be present, verify it specifically."""
+ populate_dir(self.lease_d, {"1": self.azure_lease})
+ self.assertEqual(
+ {"1": self.azure_parsed}, networkd_load_leases(self.lease_d)
+ )
+
+ def test_multiple_files(self):
+ """Multiple leases files on azure with one found return that value."""
+ self.maxDiff = None
+ populate_dir(
+ self.lease_d, {"1": self.azure_lease, "9": self.lxd_lease}
+ )
+ self.assertEqual(
+ {"1": self.azure_parsed, "9": self.lxd_parsed},
+ networkd_load_leases(self.lease_d),
+ )
+
+
+class TestEphemeralDhcpNoNetworkSetup(HttprettyTestCase):
+ @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+ def test_ephemeral_dhcp_no_network_if_url_connectivity(self, m_dhcp):
+ """No EphemeralDhcp4 network setup when connectivity_url succeeds."""
+ url = "http://example.org/index.html"
+
+ httpretty.register_uri(httpretty.GET, url)
+ with net.dhcp.EphemeralDHCPv4(
+ connectivity_url_data={"url": url},
+ ) as lease:
+ self.assertIsNone(lease)
+ # Ensure that no teardown happens:
+ m_dhcp.assert_not_called()
+
+ @mock.patch("cloudinit.net.dhcp.subp.subp")
+ @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+ def test_ephemeral_dhcp_setup_network_if_url_connectivity(
+ self, m_dhcp, m_subp
+ ):
+ """No EphemeralDhcp4 network setup when connectivity_url succeeds."""
+ url = "http://example.org/index.html"
+ fake_lease = {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.2",
+ "subnet-mask": "255.255.0.0",
+ }
+ m_dhcp.return_value = [fake_lease]
+ m_subp.return_value = ("", "")
+
+ httpretty.register_uri(httpretty.GET, url, body={}, status=404)
+ with net.dhcp.EphemeralDHCPv4(
+ connectivity_url_data={"url": url},
+ ) as lease:
+ self.assertEqual(fake_lease, lease)
+ # Ensure that dhcp discovery occurs
+ m_dhcp.called_once_with()
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/net/test_init.py b/tests/unittests/net/test_init.py
new file mode 100644
index 00000000..18b3fe59
--- /dev/null
+++ b/tests/unittests/net/test_init.py
@@ -0,0 +1,1734 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import errno
+import ipaddress
+import os
+import textwrap
+from unittest import mock
+
+import httpretty
+import pytest
+import requests
+
+import cloudinit.net as net
+from cloudinit import safeyaml as yaml
+from cloudinit.subp import ProcessExecutionError
+from cloudinit.util import ensure_file, write_file
+from tests.unittests.helpers import CiTestCase, HttprettyTestCase
+
+
+class TestSysDevPath(CiTestCase):
+ def test_sys_dev_path(self):
+ """sys_dev_path returns a path under SYS_CLASS_NET for a device."""
+ dev = "something"
+ path = "attribute"
+ expected = net.SYS_CLASS_NET + dev + "/" + path
+ self.assertEqual(expected, net.sys_dev_path(dev, path))
+
+ def test_sys_dev_path_without_path(self):
+ """When path param isn't provided it defaults to empty string."""
+ dev = "something"
+ expected = net.SYS_CLASS_NET + dev + "/"
+ self.assertEqual(expected, net.sys_dev_path(dev))
+
+
+class TestReadSysNet(CiTestCase):
+ with_logs = True
+
+ def setUp(self):
+ super(TestReadSysNet, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_read_sys_net_strips_contents_of_sys_path(self):
+ """read_sys_net strips whitespace from the contents of a sys file."""
+ content = "some stuff with trailing whitespace\t\r\n"
+ write_file(os.path.join(self.sysdir, "dev", "attr"), content)
+ self.assertEqual(content.strip(), net.read_sys_net("dev", "attr"))
+
+ def test_read_sys_net_reraises_oserror(self):
+ """read_sys_net raises OSError/IOError when file doesn't exist."""
+ # Non-specific Exception because versions of python OSError vs IOError.
+ with self.assertRaises(Exception) as context_manager: # noqa: H202
+ net.read_sys_net("dev", "attr")
+ error = context_manager.exception
+ self.assertIn("No such file or directory", str(error))
+
+ def test_read_sys_net_handles_error_with_on_enoent(self):
+ """read_sys_net handles OSError/IOError with on_enoent if provided."""
+ handled_errors = []
+
+ def on_enoent(e):
+ handled_errors.append(e)
+
+ net.read_sys_net("dev", "attr", on_enoent=on_enoent)
+ error = handled_errors[0]
+ self.assertIsInstance(error, Exception)
+ self.assertIn("No such file or directory", str(error))
+
+ def test_read_sys_net_translates_content(self):
+ """read_sys_net translates content when translate dict is provided."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, "dev", "attr"), content)
+ translate = {"you're welcome": "de nada"}
+ self.assertEqual(
+ "de nada", net.read_sys_net("dev", "attr", translate=translate)
+ )
+
+ def test_read_sys_net_errors_on_translation_failures(self):
+ """read_sys_net raises a KeyError and logs details on failure."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, "dev", "attr"), content)
+ with self.assertRaises(KeyError) as context_manager:
+ net.read_sys_net("dev", "attr", translate={})
+ error = context_manager.exception
+ self.assertEqual('"you\'re welcome"', str(error))
+ self.assertIn(
+ "Found unexpected (not translatable) value 'you're welcome' in "
+ "'{0}dev/attr".format(self.sysdir),
+ self.logs.getvalue(),
+ )
+
+ def test_read_sys_net_handles_handles_with_onkeyerror(self):
+ """read_sys_net handles translation errors calling on_keyerror."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, "dev", "attr"), content)
+ handled_errors = []
+
+ def on_keyerror(e):
+ handled_errors.append(e)
+
+ net.read_sys_net("dev", "attr", translate={}, on_keyerror=on_keyerror)
+ error = handled_errors[0]
+ self.assertIsInstance(error, KeyError)
+ self.assertEqual('"you\'re welcome"', str(error))
+
+ def test_read_sys_net_safe_false_on_translate_failure(self):
+ """read_sys_net_safe returns False on translation failures."""
+ content = "you're welcome\n"
+ write_file(os.path.join(self.sysdir, "dev", "attr"), content)
+ self.assertFalse(net.read_sys_net_safe("dev", "attr", translate={}))
+
+ def test_read_sys_net_safe_returns_false_on_noent_failure(self):
+ """read_sys_net_safe returns False on file not found failures."""
+ self.assertFalse(net.read_sys_net_safe("dev", "attr"))
+
+ def test_read_sys_net_int_returns_none_on_error(self):
+ """read_sys_net_safe returns None on failures."""
+ self.assertFalse(net.read_sys_net_int("dev", "attr"))
+
+ def test_read_sys_net_int_returns_none_on_valueerror(self):
+ """read_sys_net_safe returns None when content is not an int."""
+ write_file(os.path.join(self.sysdir, "dev", "attr"), "NOTINT\n")
+ self.assertFalse(net.read_sys_net_int("dev", "attr"))
+
+ def test_read_sys_net_int_returns_integer_from_content(self):
+ """read_sys_net_safe returns None on failures."""
+ write_file(os.path.join(self.sysdir, "dev", "attr"), "1\n")
+ self.assertEqual(1, net.read_sys_net_int("dev", "attr"))
+
+ def test_is_up_true(self):
+ """is_up is True if sys/net/devname/operstate is 'up' or 'unknown'."""
+ for state in ["up", "unknown"]:
+ write_file(os.path.join(self.sysdir, "eth0", "operstate"), state)
+ self.assertTrue(net.is_up("eth0"))
+
+ def test_is_up_false(self):
+ """is_up is False if sys/net/devname/operstate is 'down' or invalid."""
+ for state in ["down", "incomprehensible"]:
+ write_file(os.path.join(self.sysdir, "eth0", "operstate"), state)
+ self.assertFalse(net.is_up("eth0"))
+
+ def test_is_bridge(self):
+ """is_bridge is True when /sys/net/devname/bridge exists."""
+ self.assertFalse(net.is_bridge("eth0"))
+ ensure_file(os.path.join(self.sysdir, "eth0", "bridge"))
+ self.assertTrue(net.is_bridge("eth0"))
+
+ def test_is_bond(self):
+ """is_bond is True when /sys/net/devname/bonding exists."""
+ self.assertFalse(net.is_bond("eth0"))
+ ensure_file(os.path.join(self.sysdir, "eth0", "bonding"))
+ self.assertTrue(net.is_bond("eth0"))
+
+ def test_get_master(self):
+ """get_master returns the path when /sys/net/devname/master exists."""
+ self.assertIsNone(net.get_master("enP1s1"))
+ master_path = os.path.join(self.sysdir, "enP1s1", "master")
+ ensure_file(master_path)
+ self.assertEqual(master_path, net.get_master("enP1s1"))
+
+ def test_master_is_bridge_or_bond(self):
+ bridge_mac = "aa:bb:cc:aa:bb:cc"
+ bond_mac = "cc:bb:aa:cc:bb:aa"
+
+ # No master => False
+ write_file(os.path.join(self.sysdir, "eth1", "address"), bridge_mac)
+ write_file(os.path.join(self.sysdir, "eth2", "address"), bond_mac)
+
+ self.assertFalse(net.master_is_bridge_or_bond("eth1"))
+ self.assertFalse(net.master_is_bridge_or_bond("eth2"))
+
+ # masters without bridge/bonding => False
+ write_file(os.path.join(self.sysdir, "br0", "address"), bridge_mac)
+ write_file(os.path.join(self.sysdir, "bond0", "address"), bond_mac)
+
+ os.symlink("../br0", os.path.join(self.sysdir, "eth1", "master"))
+ os.symlink("../bond0", os.path.join(self.sysdir, "eth2", "master"))
+
+ self.assertFalse(net.master_is_bridge_or_bond("eth1"))
+ self.assertFalse(net.master_is_bridge_or_bond("eth2"))
+
+ # masters with bridge/bonding => True
+ write_file(os.path.join(self.sysdir, "br0", "bridge"), "")
+ write_file(os.path.join(self.sysdir, "bond0", "bonding"), "")
+
+ self.assertTrue(net.master_is_bridge_or_bond("eth1"))
+ self.assertTrue(net.master_is_bridge_or_bond("eth2"))
+
+ def test_master_is_openvswitch(self):
+ ovs_mac = "bb:cc:aa:bb:cc:aa"
+
+ # No master => False
+ write_file(os.path.join(self.sysdir, "eth1", "address"), ovs_mac)
+
+ self.assertFalse(net.master_is_bridge_or_bond("eth1"))
+
+ # masters without ovs-system => False
+ write_file(os.path.join(self.sysdir, "ovs-system", "address"), ovs_mac)
+
+ os.symlink(
+ "../ovs-system", os.path.join(self.sysdir, "eth1", "master")
+ )
+
+ self.assertFalse(net.master_is_openvswitch("eth1"))
+
+ # masters with ovs-system => True
+ os.symlink(
+ "../ovs-system",
+ os.path.join(self.sysdir, "eth1", "upper_ovs-system"),
+ )
+
+ self.assertTrue(net.master_is_openvswitch("eth1"))
+
+ def test_is_vlan(self):
+ """is_vlan is True when /sys/net/devname/uevent has DEVTYPE=vlan."""
+ ensure_file(os.path.join(self.sysdir, "eth0", "uevent"))
+ self.assertFalse(net.is_vlan("eth0"))
+ content = "junk\nDEVTYPE=vlan\njunk\n"
+ write_file(os.path.join(self.sysdir, "eth0", "uevent"), content)
+ self.assertTrue(net.is_vlan("eth0"))
+
+
+class TestGenerateFallbackConfig(CiTestCase):
+ def setUp(self):
+ super(TestGenerateFallbackConfig, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+ self.add_patch(
+ "cloudinit.net.util.is_container",
+ "m_is_container",
+ return_value=False,
+ )
+ self.add_patch("cloudinit.net.util.udevadm_settle", "m_settle")
+ self.add_patch(
+ "cloudinit.net.is_netfailover", "m_netfail", return_value=False
+ )
+ self.add_patch(
+ "cloudinit.net.is_netfail_master",
+ "m_netfail_master",
+ return_value=False,
+ )
+
+ def test_generate_fallback_finds_connected_eth_with_mac(self):
+ """generate_fallback_config finds any connected device with a mac."""
+ write_file(os.path.join(self.sysdir, "eth0", "carrier"), "1")
+ write_file(os.path.join(self.sysdir, "eth1", "carrier"), "1")
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth1", "address"), mac)
+ expected = {
+ "ethernets": {
+ "eth1": {
+ "match": {"macaddress": mac},
+ "dhcp4": True,
+ "set-name": "eth1",
+ }
+ },
+ "version": 2,
+ }
+ self.assertEqual(expected, net.generate_fallback_config())
+
+ def test_generate_fallback_finds_dormant_eth_with_mac(self):
+ """generate_fallback_config finds any dormant device with a mac."""
+ write_file(os.path.join(self.sysdir, "eth0", "dormant"), "1")
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth0", "address"), mac)
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "match": {"macaddress": mac},
+ "dhcp4": True,
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
+ self.assertEqual(expected, net.generate_fallback_config())
+
+ def test_generate_fallback_finds_eth_by_operstate(self):
+ """generate_fallback_config finds any dormant device with a mac."""
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth0", "address"), mac)
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "match": {"macaddress": mac},
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
+ valid_operstates = ["dormant", "down", "lowerlayerdown", "unknown"]
+ for state in valid_operstates:
+ write_file(os.path.join(self.sysdir, "eth0", "operstate"), state)
+ self.assertEqual(expected, net.generate_fallback_config())
+ write_file(os.path.join(self.sysdir, "eth0", "operstate"), "noworky")
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_veth(self):
+ """generate_fallback_config will skip any veth interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, "veth0", "carrier"), "1")
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_bridges(self):
+ """generate_fallback_config will skip any bridges interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, "eth0", "carrier"), "1")
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth0", "address"), mac)
+ ensure_file(os.path.join(self.sysdir, "eth0", "bridge"))
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_bonds(self):
+ """generate_fallback_config will skip any bonded interfaces."""
+ # A connected veth which gets ignored
+ write_file(os.path.join(self.sysdir, "eth0", "carrier"), "1")
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth0", "address"), mac)
+ ensure_file(os.path.join(self.sysdir, "eth0", "bonding"))
+ self.assertIsNone(net.generate_fallback_config())
+
+ def test_generate_fallback_config_skips_netfail_devs(self):
+ """gen_fallback_config ignores netfail primary,sby no mac on master."""
+ mac = "aa:bb:cc:aa:bb:cc" # netfailover devs share the same mac
+ for iface in ["ens3", "ens3sby", "enP0s1f3"]:
+ write_file(os.path.join(self.sysdir, iface, "carrier"), "1")
+ write_file(
+ os.path.join(self.sysdir, iface, "addr_assign_type"), "0"
+ )
+ write_file(os.path.join(self.sysdir, iface, "address"), mac)
+
+ def is_netfail(iface, _driver=None):
+ # ens3 is the master
+ if iface == "ens3":
+ return False
+ return True
+
+ self.m_netfail.side_effect = is_netfail
+
+ def is_netfail_master(iface, _driver=None):
+ # ens3 is the master
+ if iface == "ens3":
+ return True
+ return False
+
+ self.m_netfail_master.side_effect = is_netfail_master
+ expected = {
+ "ethernets": {
+ "ens3": {
+ "dhcp4": True,
+ "match": {"name": "ens3"},
+ "set-name": "ens3",
+ }
+ },
+ "version": 2,
+ }
+ result = net.generate_fallback_config()
+ self.assertEqual(expected, result)
+
+
+class TestNetFindFallBackNic(CiTestCase):
+ def setUp(self):
+ super(TestNetFindFallBackNic, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+ self.add_patch(
+ "cloudinit.net.util.is_container",
+ "m_is_container",
+ return_value=False,
+ )
+ self.add_patch("cloudinit.net.util.udevadm_settle", "m_settle")
+
+ def test_generate_fallback_finds_first_connected_eth_with_mac(self):
+ """find_fallback_nic finds any connected device with a mac."""
+ write_file(os.path.join(self.sysdir, "eth0", "carrier"), "1")
+ write_file(os.path.join(self.sysdir, "eth1", "carrier"), "1")
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth1", "address"), mac)
+ self.assertEqual("eth1", net.find_fallback_nic())
+
+
+class TestGetDeviceList(CiTestCase):
+ def setUp(self):
+ super(TestGetDeviceList, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_get_devicelist_raise_oserror(self):
+ """get_devicelist raise any non-ENOENT OSerror."""
+ error = OSError("Can not do it")
+ error.errno = errno.EPERM # Set non-ENOENT
+ self.m_sys_path.side_effect = error
+ with self.assertRaises(OSError) as context_manager:
+ net.get_devicelist()
+ exception = context_manager.exception
+ self.assertEqual("Can not do it", str(exception))
+
+ def test_get_devicelist_empty_without_sys_net(self):
+ """get_devicelist returns empty list when missing SYS_CLASS_NET."""
+ self.m_sys_path.return_value = "idontexist"
+ self.assertEqual([], net.get_devicelist())
+
+ def test_get_devicelist_empty_with_no_devices_in_sys_net(self):
+ """get_devicelist returns empty directoty listing for SYS_CLASS_NET."""
+ self.assertEqual([], net.get_devicelist())
+
+ def test_get_devicelist_lists_any_subdirectories_in_sys_net(self):
+ """get_devicelist returns a directory listing for SYS_CLASS_NET."""
+ write_file(os.path.join(self.sysdir, "eth0", "operstate"), "up")
+ write_file(os.path.join(self.sysdir, "eth1", "operstate"), "up")
+ self.assertCountEqual(["eth0", "eth1"], net.get_devicelist())
+
+
+@mock.patch(
+ "cloudinit.net.is_openvswitch_internal_interface",
+ mock.Mock(return_value=False),
+)
+class TestGetInterfaceMAC(CiTestCase):
+ def setUp(self):
+ super(TestGetInterfaceMAC, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_get_interface_mac_false_with_no_mac(self):
+ """get_device_list returns False when no mac is reported."""
+ ensure_file(os.path.join(self.sysdir, "eth0", "bonding"))
+ mac_path = os.path.join(self.sysdir, "eth0", "address")
+ self.assertFalse(os.path.exists(mac_path))
+ self.assertFalse(net.get_interface_mac("eth0"))
+
+ def test_get_interface_mac(self):
+ """get_interfaces returns the mac from SYS_CLASS_NET/dev/address."""
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth1", "address"), mac)
+ self.assertEqual(mac, net.get_interface_mac("eth1"))
+
+ def test_get_interface_mac_grabs_bonding_address(self):
+ """get_interfaces returns the source device mac for bonded devices."""
+ source_dev_mac = "aa:bb:cc:aa:bb:cc"
+ bonded_mac = "dd:ee:ff:dd:ee:ff"
+ write_file(os.path.join(self.sysdir, "eth1", "address"), bonded_mac)
+ write_file(
+ os.path.join(self.sysdir, "eth1", "bonding_slave", "perm_hwaddr"),
+ source_dev_mac,
+ )
+ self.assertEqual(source_dev_mac, net.get_interface_mac("eth1"))
+
+ def test_get_interfaces_empty_list_without_sys_net(self):
+ """get_interfaces returns an empty list when missing SYS_CLASS_NET."""
+ self.m_sys_path.return_value = "idontexist"
+ self.assertEqual([], net.get_interfaces())
+
+ def test_get_interfaces_by_mac_skips_empty_mac(self):
+ """Ignore 00:00:00:00:00:00 addresses from get_interfaces_by_mac."""
+ empty_mac = "00:00:00:00:00:00"
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth1", "address"), empty_mac)
+ write_file(os.path.join(self.sysdir, "eth1", "addr_assign_type"), "0")
+ write_file(os.path.join(self.sysdir, "eth2", "addr_assign_type"), "0")
+ write_file(os.path.join(self.sysdir, "eth2", "address"), mac)
+ expected = [("eth2", "aa:bb:cc:aa:bb:cc", None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+ def test_get_interfaces_by_mac_skips_missing_mac(self):
+ """Ignore interfaces without an address from get_interfaces_by_mac."""
+ write_file(os.path.join(self.sysdir, "eth1", "addr_assign_type"), "0")
+ address_path = os.path.join(self.sysdir, "eth1", "address")
+ self.assertFalse(os.path.exists(address_path))
+ mac = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth2", "addr_assign_type"), "0")
+ write_file(os.path.join(self.sysdir, "eth2", "address"), mac)
+ expected = [("eth2", "aa:bb:cc:aa:bb:cc", None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+ def test_get_interfaces_by_mac_skips_master_devs(self):
+ """Ignore interfaces with a master device which would have dup mac."""
+ mac1 = mac2 = "aa:bb:cc:aa:bb:cc"
+ write_file(os.path.join(self.sysdir, "eth1", "addr_assign_type"), "0")
+ write_file(os.path.join(self.sysdir, "eth1", "address"), mac1)
+ write_file(os.path.join(self.sysdir, "eth1", "master"), "blah")
+ write_file(os.path.join(self.sysdir, "eth2", "addr_assign_type"), "0")
+ write_file(os.path.join(self.sysdir, "eth2", "address"), mac2)
+ expected = [("eth2", mac2, None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+ @mock.patch("cloudinit.net.is_netfailover")
+ def test_get_interfaces_by_mac_skips_netfailvoer(self, m_netfail):
+ """Ignore interfaces if netfailover primary or standby."""
+ mac = "aa:bb:cc:aa:bb:cc" # netfailover devs share the same mac
+ for iface in ["ens3", "ens3sby", "enP0s1f3"]:
+ write_file(
+ os.path.join(self.sysdir, iface, "addr_assign_type"), "0"
+ )
+ write_file(os.path.join(self.sysdir, iface, "address"), mac)
+
+ def is_netfail(iface, _driver=None):
+ # ens3 is the master
+ if iface == "ens3":
+ return False
+ else:
+ return True
+
+ m_netfail.side_effect = is_netfail
+ expected = [("ens3", mac, None, None)]
+ self.assertEqual(expected, net.get_interfaces())
+
+ def test_get_interfaces_does_not_skip_phys_members_of_bridges_and_bonds(
+ self,
+ ):
+ bridge_mac = "aa:bb:cc:aa:bb:cc"
+ bond_mac = "cc:bb:aa:cc:bb:aa"
+ ovs_mac = "bb:cc:aa:bb:cc:aa"
+
+ write_file(os.path.join(self.sysdir, "br0", "address"), bridge_mac)
+ write_file(os.path.join(self.sysdir, "br0", "bridge"), "")
+
+ write_file(os.path.join(self.sysdir, "bond0", "address"), bond_mac)
+ write_file(os.path.join(self.sysdir, "bond0", "bonding"), "")
+
+ write_file(os.path.join(self.sysdir, "ovs-system", "address"), ovs_mac)
+
+ write_file(os.path.join(self.sysdir, "eth1", "address"), bridge_mac)
+ os.symlink("../br0", os.path.join(self.sysdir, "eth1", "master"))
+
+ write_file(os.path.join(self.sysdir, "eth2", "address"), bond_mac)
+ os.symlink("../bond0", os.path.join(self.sysdir, "eth2", "master"))
+
+ write_file(os.path.join(self.sysdir, "eth3", "address"), ovs_mac)
+ os.symlink(
+ "../ovs-system", os.path.join(self.sysdir, "eth3", "master")
+ )
+ os.symlink(
+ "../ovs-system",
+ os.path.join(self.sysdir, "eth3", "upper_ovs-system"),
+ )
+
+ interface_names = [interface[0] for interface in net.get_interfaces()]
+ self.assertEqual(
+ ["eth1", "eth2", "eth3", "ovs-system"], sorted(interface_names)
+ )
+
+
+class TestInterfaceHasOwnMAC(CiTestCase):
+ def setUp(self):
+ super(TestInterfaceHasOwnMAC, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_interface_has_own_mac_false_when_stolen(self):
+ """Return False from interface_has_own_mac when address is stolen."""
+ write_file(os.path.join(self.sysdir, "eth1", "addr_assign_type"), "2")
+ self.assertFalse(net.interface_has_own_mac("eth1"))
+
+ def test_interface_has_own_mac_true_when_not_stolen(self):
+ """Return False from interface_has_own_mac when mac isn't stolen."""
+ valid_assign_types = ["0", "1", "3"]
+ assign_path = os.path.join(self.sysdir, "eth1", "addr_assign_type")
+ for _type in valid_assign_types:
+ write_file(assign_path, _type)
+ self.assertTrue(net.interface_has_own_mac("eth1"))
+
+ def test_interface_has_own_mac_strict_errors_on_absent_assign_type(self):
+ """When addr_assign_type is absent, interface_has_own_mac errors."""
+ with self.assertRaises(ValueError):
+ net.interface_has_own_mac("eth1", strict=True)
+
+
+@mock.patch("cloudinit.net.subp.subp")
+class TestEphemeralIPV4Network(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestEphemeralIPV4Network, self).setUp()
+ sys_mock = mock.patch("cloudinit.net.get_sys_class_path")
+ self.m_sys_path = sys_mock.start()
+ self.sysdir = self.tmp_dir() + "/"
+ self.m_sys_path.return_value = self.sysdir
+ self.addCleanup(sys_mock.stop)
+
+ def test_ephemeral_ipv4_network_errors_on_missing_params(self, m_subp):
+ """No required params for EphemeralIPv4Network can be None."""
+ required_params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ }
+ for key in required_params.keys():
+ params = copy.deepcopy(required_params)
+ params[key] = None
+ with self.assertRaises(ValueError) as context_manager:
+ net.EphemeralIPv4Network(**params)
+ error = context_manager.exception
+ self.assertIn("Cannot init network on", str(error))
+ self.assertEqual(0, m_subp.call_count)
+
+ def test_ephemeral_ipv4_network_errors_invalid_mask_prefix(self, m_subp):
+ """Raise an error when prefix_or_mask is not a netmask or prefix."""
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "broadcast": "192.168.2.255",
+ }
+ invalid_masks = ("invalid", "invalid.", "123.123.123")
+ for error_val in invalid_masks:
+ params["prefix_or_mask"] = error_val
+ with self.assertRaises(ValueError) as context_manager:
+ with net.EphemeralIPv4Network(**params):
+ pass
+ error = context_manager.exception
+ self.assertIn(
+ "Cannot setup network, invalid prefix or netmask: ", str(error)
+ )
+ self.assertEqual(0, m_subp.call_count)
+
+ def test_ephemeral_ipv4_network_performs_teardown(self, m_subp):
+ """EphemeralIPv4Network performs teardown on the device if setup."""
+ expected_setup_calls = [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/24",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ ),
+ mock.call(
+ ["ip", "-family", "inet", "link", "set", "dev", "eth0", "up"],
+ capture=True,
+ ),
+ ]
+ expected_teardown_calls = [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "link",
+ "set",
+ "dev",
+ "eth0",
+ "down",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "del",
+ "192.168.2.2/24",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ ]
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ }
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(expected_setup_calls, m_subp.call_args_list)
+ m_subp.assert_has_calls(expected_teardown_calls)
+
+ @mock.patch("cloudinit.net.readurl")
+ def test_ephemeral_ipv4_no_network_if_url_connectivity(
+ self, m_readurl, m_subp
+ ):
+ """No network setup is performed if we can successfully connect to
+ connectivity_url."""
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ "connectivity_url_data": {"url": "http://example.org/index.html"},
+ }
+
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(
+ [mock.call(url="http://example.org/index.html", timeout=5)],
+ m_readurl.call_args_list,
+ )
+ # Ensure that no teardown happens:
+ m_subp.assert_has_calls([])
+
+ def test_ephemeral_ipv4_network_noop_when_configured(self, m_subp):
+ """EphemeralIPv4Network handles exception when address is setup.
+
+ It performs no cleanup as the interface was already setup.
+ """
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ }
+ m_subp.side_effect = ProcessExecutionError(
+ "", "RTNETLINK answers: File exists", 2
+ )
+ expected_calls = [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/24",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ )
+ ]
+ with net.EphemeralIPv4Network(**params):
+ pass
+ self.assertEqual(expected_calls, m_subp.call_args_list)
+ self.assertIn(
+ "Skip ephemeral network setup, eth0 already has address",
+ self.logs.getvalue(),
+ )
+
+ def test_ephemeral_ipv4_network_with_prefix(self, m_subp):
+ """EphemeralIPv4Network takes a valid prefix to setup the network."""
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "24",
+ "broadcast": "192.168.2.255",
+ }
+ for prefix_val in ["24", 16]: # prefix can be int or string
+ params["prefix_or_mask"] = prefix_val
+ with net.EphemeralIPv4Network(**params):
+ pass
+ m_subp.assert_has_calls(
+ [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/24",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ )
+ ]
+ )
+ m_subp.assert_has_calls(
+ [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/16",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ )
+ ]
+ )
+
+ def test_ephemeral_ipv4_network_with_new_default_route(self, m_subp):
+ """Add the route when router is set and no default route exists."""
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.0",
+ "broadcast": "192.168.2.255",
+ "router": "192.168.2.1",
+ }
+ m_subp.return_value = "", "" # Empty response from ip route gw check
+ expected_setup_calls = [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/24",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ ),
+ mock.call(
+ ["ip", "-family", "inet", "link", "set", "dev", "eth0", "up"],
+ capture=True,
+ ),
+ mock.call(["ip", "route", "show", "0.0.0.0/0"], capture=True),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "add",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ "src",
+ "192.168.2.2",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "add",
+ "default",
+ "via",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ ]
+ expected_teardown_calls = [
+ mock.call(
+ ["ip", "-4", "route", "del", "default", "dev", "eth0"],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "del",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ "src",
+ "192.168.2.2",
+ ],
+ capture=True,
+ ),
+ ]
+
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(expected_setup_calls, m_subp.call_args_list)
+ m_subp.assert_has_calls(expected_teardown_calls)
+
+ def test_ephemeral_ipv4_network_with_rfc3442_static_routes(self, m_subp):
+ params = {
+ "interface": "eth0",
+ "ip": "192.168.2.2",
+ "prefix_or_mask": "255.255.255.255",
+ "broadcast": "192.168.2.255",
+ "static_routes": [
+ ("192.168.2.1/32", "0.0.0.0"),
+ ("169.254.169.254/32", "192.168.2.1"),
+ ("0.0.0.0/0", "192.168.2.1"),
+ ],
+ "router": "192.168.2.1",
+ }
+ expected_setup_calls = [
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "add",
+ "192.168.2.2/32",
+ "broadcast",
+ "192.168.2.255",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ update_env={"LANG": "C"},
+ ),
+ mock.call(
+ ["ip", "-family", "inet", "link", "set", "dev", "eth0", "up"],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "append",
+ "192.168.2.1/32",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "append",
+ "169.254.169.254/32",
+ "via",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "append",
+ "0.0.0.0/0",
+ "via",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ ]
+ expected_teardown_calls = [
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "del",
+ "0.0.0.0/0",
+ "via",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-4",
+ "route",
+ "del",
+ "169.254.169.254/32",
+ "via",
+ "192.168.2.1",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ ["ip", "-4", "route", "del", "192.168.2.1/32", "dev", "eth0"],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "link",
+ "set",
+ "dev",
+ "eth0",
+ "down",
+ ],
+ capture=True,
+ ),
+ mock.call(
+ [
+ "ip",
+ "-family",
+ "inet",
+ "addr",
+ "del",
+ "192.168.2.2/32",
+ "dev",
+ "eth0",
+ ],
+ capture=True,
+ ),
+ ]
+ with net.EphemeralIPv4Network(**params):
+ self.assertEqual(expected_setup_calls, m_subp.call_args_list)
+ m_subp.assert_has_calls(expected_setup_calls + expected_teardown_calls)
+
+
+class TestApplyNetworkCfgNames(CiTestCase):
+ V1_CONFIG = textwrap.dedent(
+ """\
+ version: 1
+ config:
+ - type: physical
+ name: interface0
+ mac_address: "52:54:00:12:34:00"
+ subnets:
+ - type: static
+ address: 10.0.2.15
+ netmask: 255.255.255.0
+ gateway: 10.0.2.2
+ """
+ )
+ V2_CONFIG = textwrap.dedent(
+ """\
+ version: 2
+ ethernets:
+ interface0:
+ match:
+ macaddress: "52:54:00:12:34:00"
+ addresses:
+ - 10.0.2.15/24
+ gateway4: 10.0.2.2
+ set-name: interface0
+ """
+ )
+
+ V2_CONFIG_NO_SETNAME = textwrap.dedent(
+ """\
+ version: 2
+ ethernets:
+ interface0:
+ match:
+ macaddress: "52:54:00:12:34:00"
+ addresses:
+ - 10.0.2.15/24
+ gateway4: 10.0.2.2
+ """
+ )
+
+ V2_CONFIG_NO_MAC = textwrap.dedent(
+ """\
+ version: 2
+ ethernets:
+ interface0:
+ match:
+ driver: virtio-net
+ addresses:
+ - 10.0.2.15/24
+ gateway4: 10.0.2.2
+ set-name: interface0
+ """
+ )
+
+ @mock.patch("cloudinit.net.device_devid")
+ @mock.patch("cloudinit.net.device_driver")
+ @mock.patch("cloudinit.net._rename_interfaces")
+ def test_apply_v1_renames(
+ self, m_rename_interfaces, m_device_driver, m_device_devid
+ ):
+ m_device_driver.return_value = "virtio_net"
+ m_device_devid.return_value = "0x15d8"
+
+ net.apply_network_config_names(yaml.load(self.V1_CONFIG))
+
+ call = ["52:54:00:12:34:00", "interface0", "virtio_net", "0x15d8"]
+ m_rename_interfaces.assert_called_with([call])
+
+ @mock.patch("cloudinit.net.device_devid")
+ @mock.patch("cloudinit.net.device_driver")
+ @mock.patch("cloudinit.net._rename_interfaces")
+ def test_apply_v2_renames(
+ self, m_rename_interfaces, m_device_driver, m_device_devid
+ ):
+ m_device_driver.return_value = "virtio_net"
+ m_device_devid.return_value = "0x15d8"
+
+ net.apply_network_config_names(yaml.load(self.V2_CONFIG))
+
+ call = ["52:54:00:12:34:00", "interface0", "virtio_net", "0x15d8"]
+ m_rename_interfaces.assert_called_with([call])
+
+ @mock.patch("cloudinit.net._rename_interfaces")
+ def test_apply_v2_renames_skips_without_setname(self, m_rename_interfaces):
+ net.apply_network_config_names(yaml.load(self.V2_CONFIG_NO_SETNAME))
+ m_rename_interfaces.assert_called_with([])
+
+ @mock.patch("cloudinit.net._rename_interfaces")
+ def test_apply_v2_renames_skips_without_mac(self, m_rename_interfaces):
+ net.apply_network_config_names(yaml.load(self.V2_CONFIG_NO_MAC))
+ m_rename_interfaces.assert_called_with([])
+
+ def test_apply_v2_renames_raises_runtime_error_on_unknown_version(self):
+ with self.assertRaises(RuntimeError):
+ net.apply_network_config_names(yaml.load("version: 3"))
+
+
+class TestHasURLConnectivity(HttprettyTestCase):
+ def setUp(self):
+ super(TestHasURLConnectivity, self).setUp()
+ self.url = "http://fake/"
+ self.kwargs = {"allow_redirects": True, "timeout": 5.0}
+
+ @mock.patch("cloudinit.net.readurl")
+ def test_url_timeout_on_connectivity_check(self, m_readurl):
+ """A timeout of 5 seconds is provided when reading a url."""
+ self.assertTrue(
+ net.has_url_connectivity({"url": self.url}),
+ "Expected True on url connect",
+ )
+
+ def test_true_on_url_connectivity_success(self):
+ httpretty.register_uri(httpretty.GET, self.url)
+ self.assertTrue(
+ net.has_url_connectivity({"url": self.url}),
+ "Expected True on url connect",
+ )
+
+ @mock.patch("requests.Session.request")
+ def test_true_on_url_connectivity_timeout(self, m_request):
+ """A timeout raised accessing the url will return False."""
+ m_request.side_effect = requests.Timeout("Fake Connection Timeout")
+ self.assertFalse(
+ net.has_url_connectivity({"url": self.url}),
+ "Expected False on url timeout",
+ )
+
+ def test_true_on_url_connectivity_failure(self):
+ httpretty.register_uri(httpretty.GET, self.url, body={}, status=404)
+ self.assertFalse(
+ net.has_url_connectivity({"url": self.url}),
+ "Expected False on url fail",
+ )
+
+
+def _mk_v1_phys(mac, name, driver, device_id):
+ v1_cfg = {"type": "physical", "name": name, "mac_address": mac}
+ params = {}
+ if driver:
+ params.update({"driver": driver})
+ if device_id:
+ params.update({"device_id": device_id})
+
+ if params:
+ v1_cfg.update({"params": params})
+
+ return v1_cfg
+
+
+def _mk_v2_phys(mac, name, driver=None, device_id=None):
+ v2_cfg = {"set-name": name, "match": {"macaddress": mac}}
+ if driver:
+ v2_cfg["match"].update({"driver": driver})
+ if device_id:
+ v2_cfg["match"].update({"device_id": device_id})
+
+ return v2_cfg
+
+
+class TestExtractPhysdevs(CiTestCase):
+ def setUp(self):
+ super(TestExtractPhysdevs, self).setUp()
+ self.add_patch("cloudinit.net.device_driver", "m_driver")
+ self.add_patch("cloudinit.net.device_devid", "m_devid")
+
+ def test_extract_physdevs_looks_up_driver_v1(self):
+ driver = "virtio"
+ self.m_driver.return_value = driver
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", None, "0x1000"],
+ ]
+ netcfg = {
+ "version": 1,
+ "config": [_mk_v1_phys(*args) for args in physdevs],
+ }
+ # insert the driver value for verification
+ physdevs[0][2] = driver
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+ self.m_driver.assert_called_with("eth0")
+
+ def test_extract_physdevs_looks_up_driver_v2(self):
+ driver = "virtio"
+ self.m_driver.return_value = driver
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", None, "0x1000"],
+ ]
+ netcfg = {
+ "version": 2,
+ "ethernets": {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ # insert the driver value for verification
+ physdevs[0][2] = driver
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+ self.m_driver.assert_called_with("eth0")
+
+ def test_extract_physdevs_looks_up_devid_v1(self):
+ devid = "0x1000"
+ self.m_devid.return_value = devid
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", None],
+ ]
+ netcfg = {
+ "version": 1,
+ "config": [_mk_v1_phys(*args) for args in physdevs],
+ }
+ # insert the driver value for verification
+ physdevs[0][3] = devid
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+ self.m_devid.assert_called_with("eth0")
+
+ def test_extract_physdevs_looks_up_devid_v2(self):
+ devid = "0x1000"
+ self.m_devid.return_value = devid
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", None],
+ ]
+ netcfg = {
+ "version": 2,
+ "ethernets": {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ # insert the driver value for verification
+ physdevs[0][3] = devid
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+ self.m_devid.assert_called_with("eth0")
+
+ def test_get_v1_type_physical(self):
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", "0x1000"],
+ ["00:11:22:33:44:55", "ens3", "e1000", "0x1643"],
+ ["09:87:65:43:21:10", "ens0p1", "mlx4_core", "0:0:1000"],
+ ]
+ netcfg = {
+ "version": 1,
+ "config": [_mk_v1_phys(*args) for args in physdevs],
+ }
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+
+ def test_get_v2_type_physical(self):
+ physdevs = [
+ ["aa:bb:cc:dd:ee:ff", "eth0", "virtio", "0x1000"],
+ ["00:11:22:33:44:55", "ens3", "e1000", "0x1643"],
+ ["09:87:65:43:21:10", "ens0p1", "mlx4_core", "0:0:1000"],
+ ]
+ netcfg = {
+ "version": 2,
+ "ethernets": {args[1]: _mk_v2_phys(*args) for args in physdevs},
+ }
+ self.assertEqual(
+ sorted(physdevs), sorted(net.extract_physdevs(netcfg))
+ )
+
+ def test_get_v2_type_physical_skips_if_no_set_name(self):
+ netcfg = {
+ "version": 2,
+ "ethernets": {
+ "ens3": {
+ "match": {"macaddress": "00:11:22:33:44:55"},
+ }
+ },
+ }
+ self.assertEqual([], net.extract_physdevs(netcfg))
+
+ def test_runtime_error_on_unknown_netcfg_version(self):
+ with self.assertRaises(RuntimeError):
+ net.extract_physdevs({"version": 3, "awesome_config": []})
+
+
+class TestNetFailOver(CiTestCase):
+ def setUp(self):
+ super(TestNetFailOver, self).setUp()
+ self.add_patch("cloudinit.net.util", "m_util")
+ self.add_patch("cloudinit.net.read_sys_net", "m_read_sys_net")
+ self.add_patch("cloudinit.net.device_driver", "m_device_driver")
+
+ def test_get_dev_features(self):
+ devname = self.random_string()
+ features = self.random_string()
+ self.m_read_sys_net.return_value = features
+
+ self.assertEqual(features, net.get_dev_features(devname))
+ self.assertEqual(1, self.m_read_sys_net.call_count)
+ self.assertEqual(
+ mock.call(devname, "device/features"),
+ self.m_read_sys_net.call_args_list[0],
+ )
+
+ def test_get_dev_features_none_returns_empty_string(self):
+ devname = self.random_string()
+ self.m_read_sys_net.side_effect = Exception("error")
+ self.assertEqual("", net.get_dev_features(devname))
+ self.assertEqual(1, self.m_read_sys_net.call_count)
+ self.assertEqual(
+ mock.call(devname, "device/features"),
+ self.m_read_sys_net.call_args_list[0],
+ )
+
+ @mock.patch("cloudinit.net.get_dev_features")
+ def test_has_netfail_standby_feature(self, m_dev_features):
+ devname = self.random_string()
+ standby_features = ("0" * 62) + "1" + "0"
+ m_dev_features.return_value = standby_features
+ self.assertTrue(net.has_netfail_standby_feature(devname))
+
+ @mock.patch("cloudinit.net.get_dev_features")
+ def test_has_netfail_standby_feature_short_is_false(self, m_dev_features):
+ devname = self.random_string()
+ standby_features = self.random_string()
+ m_dev_features.return_value = standby_features
+ self.assertFalse(net.has_netfail_standby_feature(devname))
+
+ @mock.patch("cloudinit.net.get_dev_features")
+ def test_has_netfail_standby_feature_not_present_is_false(
+ self, m_dev_features
+ ):
+ devname = self.random_string()
+ standby_features = "0" * 64
+ m_dev_features.return_value = standby_features
+ self.assertFalse(net.has_netfail_standby_feature(devname))
+
+ @mock.patch("cloudinit.net.get_dev_features")
+ def test_has_netfail_standby_feature_no_features_is_false(
+ self, m_dev_features
+ ):
+ devname = self.random_string()
+ standby_features = None
+ m_dev_features.return_value = standby_features
+ self.assertFalse(net.has_netfail_standby_feature(devname))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_master(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = False # no master sysfs attr
+ m_standby.return_value = True # has standby feature flag
+ self.assertTrue(net.is_netfail_master(devname, driver))
+
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_master_checks_master_attr(self, m_sysdev):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_sysdev.return_value = self.random_string()
+ self.assertFalse(net.is_netfail_master(devname, driver))
+ self.assertEqual(1, m_sysdev.call_count)
+ self.assertEqual(
+ mock.call(devname, path="master"), m_sysdev.call_args_list[0]
+ )
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_master_wrong_driver(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = self.random_string()
+ self.assertFalse(net.is_netfail_master(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_master_has_master_attr(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = True # has master sysfs attr
+ self.assertFalse(net.is_netfail_master(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_master_no_standby_feat(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = False # no master sysfs attr
+ m_standby.return_value = False # no standby feature flag
+ self.assertFalse(net.is_netfail_master(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_primary(self, m_sysdev, m_exists, m_standby):
+ devname = self.random_string()
+ driver = self.random_string() # device not virtio_net
+ master_devname = self.random_string()
+ m_sysdev.return_value = "%s/%s" % (
+ self.random_string(),
+ master_devname,
+ )
+ m_exists.return_value = True # has master sysfs attr
+ self.m_device_driver.return_value = "virtio_net" # master virtio_net
+ m_standby.return_value = True # has standby feature flag
+ self.assertTrue(net.is_netfail_primary(devname, driver))
+ self.assertEqual(1, self.m_device_driver.call_count)
+ self.assertEqual(
+ mock.call(master_devname), self.m_device_driver.call_args_list[0]
+ )
+ self.assertEqual(1, m_standby.call_count)
+ self.assertEqual(
+ mock.call(master_devname), m_standby.call_args_list[0]
+ )
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_primary_wrong_driver(
+ self, m_sysdev, m_exists, m_standby
+ ):
+ devname = self.random_string()
+ driver = "virtio_net"
+ self.assertFalse(net.is_netfail_primary(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_primary_no_master(self, m_sysdev, m_exists, m_standby):
+ devname = self.random_string()
+ driver = self.random_string() # device not virtio_net
+ m_exists.return_value = False # no master sysfs attr
+ self.assertFalse(net.is_netfail_primary(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_primary_bad_master(
+ self, m_sysdev, m_exists, m_standby
+ ):
+ devname = self.random_string()
+ driver = self.random_string() # device not virtio_net
+ master_devname = self.random_string()
+ m_sysdev.return_value = "%s/%s" % (
+ self.random_string(),
+ master_devname,
+ )
+ m_exists.return_value = True # has master sysfs attr
+ self.m_device_driver.return_value = "XXXX" # master not virtio_net
+ self.assertFalse(net.is_netfail_primary(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ @mock.patch("cloudinit.net.sys_dev_path")
+ def test_is_netfail_primary_no_standby(
+ self, m_sysdev, m_exists, m_standby
+ ):
+ devname = self.random_string()
+ driver = self.random_string() # device not virtio_net
+ master_devname = self.random_string()
+ m_sysdev.return_value = "%s/%s" % (
+ self.random_string(),
+ master_devname,
+ )
+ m_exists.return_value = True # has master sysfs attr
+ self.m_device_driver.return_value = "virtio_net" # master virtio_net
+ m_standby.return_value = False # master has no standby feature flag
+ self.assertFalse(net.is_netfail_primary(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_standby(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = True # has master sysfs attr
+ m_standby.return_value = True # has standby feature flag
+ self.assertTrue(net.is_netfail_standby(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_standby_wrong_driver(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = self.random_string()
+ self.assertFalse(net.is_netfail_standby(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_standby_no_master(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = False # has master sysfs attr
+ self.assertFalse(net.is_netfail_standby(devname, driver))
+
+ @mock.patch("cloudinit.net.has_netfail_standby_feature")
+ @mock.patch("cloudinit.net.os.path.exists")
+ def test_is_netfail_standby_no_standby_feature(self, m_exists, m_standby):
+ devname = self.random_string()
+ driver = "virtio_net"
+ m_exists.return_value = True # has master sysfs attr
+ m_standby.return_value = False # has standby feature flag
+ self.assertFalse(net.is_netfail_standby(devname, driver))
+
+ @mock.patch("cloudinit.net.is_netfail_standby")
+ @mock.patch("cloudinit.net.is_netfail_primary")
+ def test_is_netfailover_primary(self, m_primary, m_standby):
+ devname = self.random_string()
+ driver = self.random_string()
+ m_primary.return_value = True
+ m_standby.return_value = False
+ self.assertTrue(net.is_netfailover(devname, driver))
+
+ @mock.patch("cloudinit.net.is_netfail_standby")
+ @mock.patch("cloudinit.net.is_netfail_primary")
+ def test_is_netfailover_standby(self, m_primary, m_standby):
+ devname = self.random_string()
+ driver = self.random_string()
+ m_primary.return_value = False
+ m_standby.return_value = True
+ self.assertTrue(net.is_netfailover(devname, driver))
+
+ @mock.patch("cloudinit.net.is_netfail_standby")
+ @mock.patch("cloudinit.net.is_netfail_primary")
+ def test_is_netfailover_returns_false(self, m_primary, m_standby):
+ devname = self.random_string()
+ driver = self.random_string()
+ m_primary.return_value = False
+ m_standby.return_value = False
+ self.assertFalse(net.is_netfailover(devname, driver))
+
+
+class TestOpenvswitchIsInstalled:
+ """Test cloudinit.net.openvswitch_is_installed.
+
+ Uses the ``clear_lru_cache`` local autouse fixture to allow us to test
+ despite the ``lru_cache`` decorator on the unit under test.
+ """
+
+ @pytest.fixture(autouse=True)
+ def clear_lru_cache(self):
+ net.openvswitch_is_installed.cache_clear()
+
+ @pytest.mark.parametrize(
+ "expected,which_return", [(True, "/some/path"), (False, None)]
+ )
+ @mock.patch("cloudinit.net.subp.which")
+ def test_mirrors_which_result(self, m_which, expected, which_return):
+ m_which.return_value = which_return
+ assert expected == net.openvswitch_is_installed()
+
+ @mock.patch("cloudinit.net.subp.which")
+ def test_only_calls_which_once(self, m_which):
+ net.openvswitch_is_installed()
+ net.openvswitch_is_installed()
+ assert 1 == m_which.call_count
+
+
+@mock.patch("cloudinit.net.subp.subp", return_value=("", ""))
+class TestGetOVSInternalInterfaces:
+ """Test cloudinit.net.get_ovs_internal_interfaces.
+
+ Uses the ``clear_lru_cache`` local autouse fixture to allow us to test
+ despite the ``lru_cache`` decorator on the unit under test.
+ """
+
+ @pytest.fixture(autouse=True)
+ def clear_lru_cache(self):
+ net.get_ovs_internal_interfaces.cache_clear()
+
+ def test_command_used(self, m_subp):
+ """Test we use the correct command when we call subp"""
+ net.get_ovs_internal_interfaces()
+
+ assert [
+ mock.call(net.OVS_INTERNAL_INTERFACE_LOOKUP_CMD)
+ ] == m_subp.call_args_list
+
+ def test_subp_contents_split_and_returned(self, m_subp):
+ """Test that the command output is appropriately mangled."""
+ stdout = "iface1\niface2\niface3\n"
+ m_subp.return_value = (stdout, "")
+
+ assert [
+ "iface1",
+ "iface2",
+ "iface3",
+ ] == net.get_ovs_internal_interfaces()
+
+ def test_database_connection_error_handled_gracefully(self, m_subp):
+ """Test that the error indicating OVS is down is handled gracefully."""
+ m_subp.side_effect = ProcessExecutionError(
+ stderr="database connection failed"
+ )
+
+ assert [] == net.get_ovs_internal_interfaces()
+
+ def test_other_errors_raised(self, m_subp):
+ """Test that only database connection errors are handled."""
+ m_subp.side_effect = ProcessExecutionError()
+
+ with pytest.raises(ProcessExecutionError):
+ net.get_ovs_internal_interfaces()
+
+ def test_only_runs_once(self, m_subp):
+ """Test that we cache the value."""
+ net.get_ovs_internal_interfaces()
+ net.get_ovs_internal_interfaces()
+
+ assert 1 == m_subp.call_count
+
+
+@mock.patch("cloudinit.net.get_ovs_internal_interfaces")
+@mock.patch("cloudinit.net.openvswitch_is_installed")
+class TestIsOpenVSwitchInternalInterface:
+ def test_false_if_ovs_not_installed(
+ self, m_openvswitch_is_installed, _m_get_ovs_internal_interfaces
+ ):
+ """Test that OVS' absence returns False."""
+ m_openvswitch_is_installed.return_value = False
+
+ assert not net.is_openvswitch_internal_interface("devname")
+
+ @pytest.mark.parametrize(
+ "detected_interfaces,devname,expected_return",
+ [
+ ([], "devname", False),
+ (["notdevname"], "devname", False),
+ (["devname"], "devname", True),
+ (["some", "other", "devices", "and", "ours"], "ours", True),
+ ],
+ )
+ def test_return_value_based_on_detected_interfaces(
+ self,
+ m_openvswitch_is_installed,
+ m_get_ovs_internal_interfaces,
+ detected_interfaces,
+ devname,
+ expected_return,
+ ):
+ """Test that the detected interfaces are used correctly."""
+ m_openvswitch_is_installed.return_value = True
+ m_get_ovs_internal_interfaces.return_value = detected_interfaces
+ assert expected_return == net.is_openvswitch_internal_interface(
+ devname
+ )
+
+
+class TestIsIpAddress:
+ """Tests for net.is_ip_address.
+
+ Instead of testing with values we rely on the ipaddress stdlib module to
+ handle all values correctly, so simply test that is_ip_address defers to
+ the ipaddress module correctly.
+ """
+
+ @pytest.mark.parametrize(
+ "ip_address_side_effect,expected_return",
+ (
+ (ValueError, False),
+ (lambda _: ipaddress.IPv4Address("192.168.0.1"), True),
+ (lambda _: ipaddress.IPv6Address("2001:db8::"), True),
+ ),
+ )
+ def test_is_ip_address(self, ip_address_side_effect, expected_return):
+ with mock.patch(
+ "cloudinit.net.ipaddress.ip_address",
+ side_effect=ip_address_side_effect,
+ ) as m_ip_address:
+ ret = net.is_ip_address(mock.sentinel.ip_address_in)
+ assert expected_return == ret
+ expected_call = mock.call(mock.sentinel.ip_address_in)
+ assert [expected_call] == m_ip_address.call_args_list
+
+
+class TestIsIpv4Address:
+ """Tests for net.is_ipv4_address.
+
+ Instead of testing with values we rely on the ipaddress stdlib module to
+ handle all values correctly, so simply test that is_ipv4_address defers to
+ the ipaddress module correctly.
+ """
+
+ @pytest.mark.parametrize(
+ "ipv4address_mock,expected_return",
+ (
+ (mock.Mock(side_effect=ValueError), False),
+ (
+ mock.Mock(return_value=ipaddress.IPv4Address("192.168.0.1")),
+ True,
+ ),
+ ),
+ )
+ def test_is_ip_address(self, ipv4address_mock, expected_return):
+ with mock.patch(
+ "cloudinit.net.ipaddress.IPv4Address", ipv4address_mock
+ ) as m_ipv4address:
+ ret = net.is_ipv4_address(mock.sentinel.ip_address_in)
+ assert expected_return == ret
+ expected_call = mock.call(mock.sentinel.ip_address_in)
+ assert [expected_call] == m_ipv4address.call_args_list
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/net/test_network_state.py b/tests/unittests/net/test_network_state.py
new file mode 100644
index 00000000..471d969a
--- /dev/null
+++ b/tests/unittests/net/test_network_state.py
@@ -0,0 +1,222 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+import ipaddress
+from unittest import mock
+
+import pytest
+
+from cloudinit import safeyaml
+from cloudinit.net import network_state
+from tests.unittests.helpers import CiTestCase
+
+netstate_path = "cloudinit.net.network_state"
+
+
+_V1_CONFIG_NAMESERVERS = """\
+network:
+ version: 1
+ config:
+ - type: nameserver
+ interface: {iface}
+ address:
+ - 192.168.1.1
+ - 8.8.8.8
+ search:
+ - spam.local
+ - type: nameserver
+ address:
+ - 192.168.1.0
+ - 4.4.4.4
+ search:
+ - eggs.local
+ - type: physical
+ name: eth0
+ mac_address: '00:11:22:33:44:55'
+ - type: physical
+ name: eth1
+ mac_address: '66:77:88:99:00:11'
+"""
+
+V1_CONFIG_NAMESERVERS_VALID = _V1_CONFIG_NAMESERVERS.format(iface="eth1")
+V1_CONFIG_NAMESERVERS_INVALID = _V1_CONFIG_NAMESERVERS.format(iface="eth90")
+
+V2_CONFIG_NAMESERVERS = """\
+network:
+ version: 2
+ ethernets:
+ eth0:
+ match:
+ macaddress: '00:11:22:33:44:55'
+ nameservers:
+ search: [spam.local, eggs.local]
+ addresses: [8.8.8.8]
+ eth1:
+ match:
+ macaddress: '66:77:88:99:00:11'
+ set-name: "ens92"
+ nameservers:
+ search: [foo.local, bar.local]
+ addresses: [4.4.4.4]
+"""
+
+
+class TestNetworkStateParseConfig(CiTestCase):
+ def setUp(self):
+ super(TestNetworkStateParseConfig, self).setUp()
+ nsi_path = netstate_path + ".NetworkStateInterpreter"
+ self.add_patch(nsi_path, "m_nsi")
+
+ def test_missing_version_returns_none(self):
+ ncfg = {}
+ with self.assertRaises(RuntimeError):
+ network_state.parse_net_config_data(ncfg)
+
+ def test_unknown_versions_returns_none(self):
+ ncfg = {"version": 13.2}
+ with self.assertRaises(RuntimeError):
+ network_state.parse_net_config_data(ncfg)
+
+ def test_version_2_passes_self_as_config(self):
+ ncfg = {"version": 2, "otherconfig": {}, "somemore": [1, 2, 3]}
+ network_state.parse_net_config_data(ncfg)
+ self.assertEqual(
+ [mock.call(version=2, config=ncfg)], self.m_nsi.call_args_list
+ )
+
+ def test_valid_config_gets_network_state(self):
+ ncfg = {"version": 2, "otherconfig": {}, "somemore": [1, 2, 3]}
+ result = network_state.parse_net_config_data(ncfg)
+ self.assertNotEqual(None, result)
+
+ def test_empty_v1_config_gets_network_state(self):
+ ncfg = {"version": 1, "config": []}
+ result = network_state.parse_net_config_data(ncfg)
+ self.assertNotEqual(None, result)
+
+ def test_empty_v2_config_gets_network_state(self):
+ ncfg = {"version": 2}
+ result = network_state.parse_net_config_data(ncfg)
+ self.assertNotEqual(None, result)
+
+
+class TestNetworkStateParseConfigV2(CiTestCase):
+ def test_version_2_ignores_renderer_key(self):
+ ncfg = {"version": 2, "renderer": "networkd", "ethernets": {}}
+ nsi = network_state.NetworkStateInterpreter(
+ version=ncfg["version"], config=ncfg
+ )
+ nsi.parse_config(skip_broken=False)
+ self.assertEqual(ncfg, nsi.as_dict()["config"])
+
+
+class TestNetworkStateParseNameservers:
+ def _parse_network_state_from_config(self, config):
+ yaml = safeyaml.load(config)
+ return network_state.parse_net_config_data(yaml["network"])
+
+ def test_v1_nameservers_valid(self):
+ config = self._parse_network_state_from_config(
+ V1_CONFIG_NAMESERVERS_VALID
+ )
+
+ # If an interface was specified, DNS shouldn't be in the global list
+ assert ["192.168.1.0", "4.4.4.4"] == sorted(config.dns_nameservers)
+ assert ["eggs.local"] == config.dns_searchdomains
+
+ # If an interface was specified, DNS should be part of the interface
+ for iface in config.iter_interfaces():
+ if iface["name"] == "eth1":
+ assert iface["dns"]["addresses"] == ["192.168.1.1", "8.8.8.8"]
+ assert iface["dns"]["search"] == ["spam.local"]
+ else:
+ assert "dns" not in iface
+
+ def test_v1_nameservers_invalid(self):
+ with pytest.raises(ValueError):
+ self._parse_network_state_from_config(
+ V1_CONFIG_NAMESERVERS_INVALID
+ )
+
+ def test_v2_nameservers(self):
+ config = self._parse_network_state_from_config(V2_CONFIG_NAMESERVERS)
+
+ # Ensure DNS defined on interface exists on interface
+ for iface in config.iter_interfaces():
+ if iface["name"] == "eth0":
+ assert iface["dns"] == {
+ "nameservers": ["8.8.8.8"],
+ "search": ["spam.local", "eggs.local"],
+ }
+ else:
+ assert iface["dns"] == {
+ "nameservers": ["4.4.4.4"],
+ "search": ["foo.local", "bar.local"],
+ }
+
+ # Ensure DNS defined on interface also exists globally (since there
+ # is no global DNS definitions in v2)
+ assert ["4.4.4.4", "8.8.8.8"] == sorted(config.dns_nameservers)
+ assert [
+ "bar.local",
+ "eggs.local",
+ "foo.local",
+ "spam.local",
+ ] == sorted(config.dns_searchdomains)
+
+
+class TestNetworkStateHelperFunctions(CiTestCase):
+ def test_mask_to_net_prefix_ipv4(self):
+ netmask_value = "255.255.255.0"
+ expected = 24
+ prefix_value = network_state.ipv4_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+ def test_mask_to_net_prefix_all_bits_ipv4(self):
+ netmask_value = "255.255.255.255"
+ expected = 32
+ prefix_value = network_state.ipv4_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+ def test_mask_to_net_prefix_to_many_bits_ipv4(self):
+ netmask_value = "33"
+ self.assertRaises(
+ ValueError, network_state.ipv4_mask_to_net_prefix, netmask_value
+ )
+
+ def test_mask_to_net_prefix_all_bits_ipv6(self):
+ netmask_value = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
+ expected = 128
+ prefix_value = network_state.ipv6_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+ def test_mask_to_net_prefix_ipv6(self):
+ netmask_value = "ffff:ffff:ffff:ffff::"
+ expected = 64
+ prefix_value = network_state.ipv6_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+ def test_mask_to_net_prefix_raises_value_error(self):
+ netmask_value = "ff:ff:ff:ff::"
+ self.assertRaises(
+ ValueError, network_state.ipv6_mask_to_net_prefix, netmask_value
+ )
+
+ def test_mask_to_net_prefix_to_many_bits_ipv6(self):
+ netmask_value = "129"
+ self.assertRaises(
+ ValueError, network_state.ipv6_mask_to_net_prefix, netmask_value
+ )
+
+ def test_mask_to_net_prefix_ipv4_object(self):
+ netmask_value = ipaddress.IPv4Address("255.255.255.255")
+ expected = 32
+ prefix_value = network_state.ipv4_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+ def test_mask_to_net_prefix_ipv6_object(self):
+ netmask_value = ipaddress.IPv6Address("ffff:ffff:ffff::")
+ expected = 48
+ prefix_value = network_state.ipv6_mask_to_net_prefix(netmask_value)
+ assert prefix_value == expected
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/net/test_networkd.py b/tests/unittests/net/test_networkd.py
new file mode 100644
index 00000000..ec1d04e9
--- /dev/null
+++ b/tests/unittests/net/test_networkd.py
@@ -0,0 +1,64 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import safeyaml
+from cloudinit.net import network_state, networkd
+
+V2_CONFIG_SET_NAME = """\
+network:
+ version: 2
+ ethernets:
+ eth0:
+ match:
+ macaddress: '00:11:22:33:44:55'
+ nameservers:
+ search: [spam.local, eggs.local]
+ addresses: [8.8.8.8]
+ eth1:
+ match:
+ macaddress: '66:77:88:99:00:11'
+ set-name: "ens92"
+ nameservers:
+ search: [foo.local, bar.local]
+ addresses: [4.4.4.4]
+"""
+
+V2_CONFIG_SET_NAME_RENDERED_ETH0 = """[Match]
+MACAddress=00:11:22:33:44:55
+Name=eth0
+
+[Network]
+DHCP=no
+DNS=8.8.8.8
+Domains=spam.local eggs.local
+
+"""
+
+V2_CONFIG_SET_NAME_RENDERED_ETH1 = """[Match]
+MACAddress=66:77:88:99:00:11
+Name=ens92
+
+[Network]
+DHCP=no
+DNS=4.4.4.4
+Domains=foo.local bar.local
+
+"""
+
+
+class TestNetworkdRenderState:
+ def _parse_network_state_from_config(self, config):
+ yaml = safeyaml.load(config)
+ return network_state.parse_net_config_data(yaml["network"])
+
+ def test_networkd_render_with_set_name(self):
+ ns = self._parse_network_state_from_config(V2_CONFIG_SET_NAME)
+ renderer = networkd.Renderer()
+ rendered_content = renderer._render_content(ns)
+
+ assert "eth0" in rendered_content
+ assert rendered_content["eth0"] == V2_CONFIG_SET_NAME_RENDERED_ETH0
+ assert "ens92" in rendered_content
+ assert rendered_content["ens92"] == V2_CONFIG_SET_NAME_RENDERED_ETH1
+
+
+# vi: ts=4 expandtab