# This file is part of cloud-init. See LICENSE file for license information. import httpretty import os import signal from textwrap import dedent import cloudinit.net as net from cloudinit.net.dhcp import ( InvalidDHCPLeaseFileError, maybe_perform_dhcp_discovery, parse_dhcp_lease_file, dhcp_discovery, networkd_load_leases) from cloudinit.util import ensure_file, write_file from cloudinit.tests.helpers import ( CiTestCase, HttprettyTestCase, mock, populate_dir, wrap_and_call) class TestParseDHCPLeasesFile(CiTestCase): def test_parse_empty_lease_file_errors(self): """parse_dhcp_lease_file errors when file content is empty.""" empty_file = self.tmp_path('leases') ensure_file(empty_file) with self.assertRaises(InvalidDHCPLeaseFileError) as context_manager: parse_dhcp_lease_file(empty_file) error = context_manager.exception self.assertIn('Cannot parse empty dhcp lease file', str(error)) def test_parse_malformed_lease_file_content_errors(self): """parse_dhcp_lease_file errors when file content isn't dhcp leases.""" non_lease_file = self.tmp_path('leases') write_file(non_lease_file, 'hi mom.') with self.assertRaises(InvalidDHCPLeaseFileError) as context_manager: parse_dhcp_lease_file(non_lease_file) error = context_manager.exception self.assertIn('Cannot parse dhcp lease file', str(error)) def test_parse_multiple_leases(self): """parse_dhcp_lease_file returns a list of all leases within.""" lease_file = self.tmp_path('leases') content = dedent(""" lease { interface "wlp3s0"; fixed-address 192.168.2.74; option subnet-mask 255.255.255.0; option routers 192.168.2.1; renew 4 2017/07/27 18:02:30; expire 5 2017/07/28 07:08:15; } lease { interface "wlp3s0"; fixed-address 192.168.2.74; option subnet-mask 255.255.255.0; option routers 192.168.2.1; } """) expected = [ {'interface': 'wlp3s0', 'fixed-address': '192.168.2.74', 'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1', 'renew': '4 2017/07/27 18:02:30', 'expire': '5 2017/07/28 07:08:15'}, {'interface': 'wlp3s0', 'fixed-address': '192.168.2.74', 'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}] write_file(lease_file, content) self.assertItemsEqual(expected, parse_dhcp_lease_file(lease_file)) class TestDHCPDiscoveryClean(CiTestCase): with_logs = True @mock.patch('cloudinit.net.dhcp.find_fallback_nic') def test_no_fallback_nic_found(self, m_fallback_nic): """Log and do nothing when nic is absent and no fallback is found.""" m_fallback_nic.return_value = None # No fallback nic found self.assertEqual([], maybe_perform_dhcp_discovery()) self.assertIn( 'Skip dhcp_discovery: Unable to find fallback nic.', self.logs.getvalue()) def test_provided_nic_does_not_exist(self): """When the provided nic doesn't exist, log a message and no-op.""" self.assertEqual([], maybe_perform_dhcp_discovery('idontexist')) self.assertIn( 'Skip dhcp_discovery: nic idontexist not found in get_devicelist.', self.logs.getvalue()) @mock.patch('cloudinit.net.dhcp.util.which') @mock.patch('cloudinit.net.dhcp.find_fallback_nic') def test_absent_dhclient_command(self, m_fallback, m_which): """When dhclient doesn't exist in the OS, log the issue and no-op.""" m_fallback.return_value = 'eth9' m_which.return_value = None # dhclient isn't found self.assertEqual([], maybe_perform_dhcp_discovery()) self.assertIn( 'Skip dhclient configuration: No dhclient command found.', self.logs.getvalue()) @mock.patch('cloudinit.temp_utils.os.getuid') @mock.patch('cloudinit.net.dhcp.dhcp_discovery') @mock.patch('cloudinit.net.dhcp.util.which') @mock.patch('cloudinit.net.dhcp.find_fallback_nic') def test_dhclient_run_with_tmpdir(self, m_fback, m_which, m_dhcp, m_uid): """maybe_perform_dhcp_discovery passes tmpdir to dhcp_discovery.""" m_uid.return_value = 0 # Fake root user for tmpdir m_fback.return_value = 'eth9' m_which.return_value = '/sbin/dhclient' m_dhcp.return_value = {'address': '192.168.2.2'} retval = wrap_and_call( 'cloudinit.temp_utils', {'_TMPDIR': {'new': None}, 'os.getuid': 0}, maybe_perform_dhcp_discovery) self.assertEqual({'address': '192.168.2.2'}, retval) self.assertEqual( 1, m_dhcp.call_count, 'dhcp_discovery not called once') call = m_dhcp.call_args_list[0] self.assertEqual('/sbin/dhclient', call[0][0]) self.assertEqual('eth9', call[0][1]) self.assertIn('/var/tmp/cloud-init/cloud-init-dhcp-', call[0][2]) @mock.patch('time.sleep', mock.MagicMock()) @mock.patch('cloudinit.net.dhcp.os.kill') @mock.patch('cloudinit.net.dhcp.util.subp') def test_dhcp_discovery_run_in_sandbox_warns_invalid_pid(self, m_subp, m_kill): """dhcp_discovery logs a warning when pidfile contains invalid content. Lease processing still occurs and no proc kill is attempted. """ tmpdir = self.tmp_dir() dhclient_script = os.path.join(tmpdir, 'dhclient.orig') script_content = '#!/bin/bash\necho fake-dhclient' write_file(dhclient_script, script_content, mode=0o755) write_file(self.tmp_path('dhclient.pid', tmpdir), '') # Empty pid '' lease_content = dedent(""" lease { interface "eth9"; fixed-address 192.168.2.74; option subnet-mask 255.255.255.0; option routers 192.168.2.1; } """) write_file(self.tmp_path('dhcp.leases', tmpdir), lease_content) self.assertItemsEqual( [{'interface': 'eth9', 'fixed-address': '192.168.2.74', 'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}], dhcp_discovery(dhclient_script, 'eth9', tmpdir)) self.assertIn( "dhclient(pid=, parentpid=unknown) failed " "to daemonize after 10.0 seconds", self.logs.getvalue()) m_kill.assert_not_called() @mock.patch('cloudinit.net.dhcp.util.get_proc_ppid') @mock.patch('cloudinit.net.dhcp.os.kill') @mock.patch('cloudinit.net.dhcp.util.wait_for_files') @mock.patch('cloudinit.net.dhcp.util.subp') def test_dhcp_discovery_run_in_sandbox_waits_on_lease_and_pid(self, m_subp, m_wait, m_kill, m_getppid): """dhcp_discovery waits for the presence of pidfile and dhcp.leases.""" tmpdir = self.tmp_dir() dhclient_script = os.path.join(tmpdir, 'dhclient.orig') script_content = '#!/bin/bash\necho fake-dhclient' write_file(dhclient_script, script_content, mode=0o755) # Don't create pid or leases file pidfile = self.tmp_path('dhclient.pid', tmpdir) leasefile = self.tmp_path('dhcp.leases', tmpdir) m_wait.return_value = [pidfile] # Return the missing pidfile wait for m_getppid.return_value = 1 # Indicate that dhclient has daemonized self.assertEqual([], dhcp_discovery(dhclient_script, 'eth9', tmpdir)) self.assertEqual( mock.call([pidfile, leasefile], maxwait=5, naplen=0.01), m_wait.call_args_list[0]) self.assertIn( 'WARNING: dhclient did not produce expected files: dhclient.pid', self.logs.getvalue()) m_kill.assert_not_called() @mock.patch('cloudinit.net.dhcp.util.get_proc_ppid') @mock.patch('cloudinit.net.dhcp.os.kill') @mock.patch('cloudinit.net.dhcp.util.subp') def test_dhcp_discovery_run_in_sandbox(self, m_subp, m_kill, m_getppid): """dhcp_discovery brings up the interface and runs dhclient. It also returns the parsed dhcp.leases file generated in the sandbox. """ tmpdir = self.tmp_dir() dhclient_script = os.path.join(tmpdir, 'dhclient.orig') script_content = '#!/bin/bash\necho fake-dhclient' write_file(dhclient_script, script_content, mode=0o755) lease_content = dedent(""" lease { interface "eth9"; fixed-address 192.168.2.74; option subnet-mask 255.255.255.0; option routers 192.168.2.1; } """) lease_file = os.path.join(tmpdir, 'dhcp.leases') write_file(lease_file, lease_content) pid_file = os.path.join(tmpdir, 'dhclient.pid') my_pid = 1 write_file(pid_file, "%d\n" % my_pid) m_getppid.return_value = 1 # Indicate that dhclient has daemonized self.assertItemsEqual( [{'interface': 'eth9', 'fixed-address': '192.168.2.74', 'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}], dhcp_discovery(dhclient_script, 'eth9', tmpdir)) # dhclient script got copied with open(os.path.join(tmpdir, 'dhclient')) as stream: self.assertEqual(script_content, stream.read()) # Interface was brought up before dhclient called from sandbox m_subp.assert_has_calls([ mock.call( ['ip', 'link', 'set', 'dev', 'eth9', 'up'], capture=True), mock.call( [os.path.join(tmpdir, 'dhclient'), '-1', '-v', '-lf', lease_file, '-pf', os.path.join(tmpdir, 'dhclient.pid'), 'eth9', '-sf', '/bin/true'], capture=True)]) m_kill.assert_has_calls([mock.call(my_pid, signal.SIGKILL)]) class TestSystemdParseLeases(CiTestCase): lxd_lease = dedent("""\ # This is private data. Do not parse. ADDRESS=10.75.205.242 NETMASK=255.255.255.0 ROUTER=10.75.205.1 SERVER_ADDRESS=10.75.205.1 NEXT_SERVER=10.75.205.1 BROADCAST=10.75.205.255 T1=1580 T2=2930 LIFETIME=3600 DNS=10.75.205.1 DOMAINNAME=lxd HOSTNAME=a1 CLIENTID=ffe617693400020000ab110c65a6a0866931c2 """) lxd_parsed = { 'ADDRESS': '10.75.205.242', 'NETMASK': '255.255.255.0', 'ROUTER': '10.75.205.1', 'SERVER_ADDRESS': '10.75.205.1', 'NEXT_SERVER': '10.75.205.1', 'BROADCAST': '10.75.205.255', 'T1': '1580', 'T2': '2930', 'LIFETIME': '3600', 'DNS': '10.75.205.1', 'DOMAINNAME': 'lxd', 'HOSTNAME': 'a1', 'CLIENTID': 'ffe617693400020000ab110c65a6a0866931c2', } azure_lease = dedent("""\ # This is private data. Do not parse. ADDRESS=10.132.0.5 NETMASK=255.255.255.255 ROUTER=10.132.0.1 SERVER_ADDRESS=169.254.169.254 NEXT_SERVER=10.132.0.1 MTU=1460 T1=43200 T2=75600 LIFETIME=86400 DNS=169.254.169.254 NTP=169.254.169.254 DOMAINNAME=c.ubuntu-foundations.internal DOMAIN_SEARCH_LIST=c.ubuntu-foundations.internal google.internal HOSTNAME=tribaal-test-171002-1349.c.ubuntu-foundations.internal ROUTES=10.132.0.1/32,0.0.0.0 0.0.0.0/0,10.132.0.1 CLIENTID=ff405663a200020000ab11332859494d7a8b4c OPTION_245=624c3620 """) azure_parsed = { 'ADDRESS': '10.132.0.5', 'NETMASK': '255.255.255.255', 'ROUTER': '10.132.0.1', 'SERVER_ADDRESS': '169.254.169.254', 'NEXT_SERVER': '10.132.0.1', 'MTU': '1460', 'T1': '43200', 'T2': '75600', 'LIFETIME': '86400', 'DNS': '169.254.169.254', 'NTP': '169.254.169.254', 'DOMAINNAME': 'c.ubuntu-foundations.internal', 'DOMAIN_SEARCH_LIST': 'c.ubuntu-foundations.internal google.internal', 'HOSTNAME': 'tribaal-test-171002-1349.c.ubuntu-foundations.internal', 'ROUTES': '10.132.0.1/32,0.0.0.0 0.0.0.0/0,10.132.0.1', 'CLIENTID': 'ff405663a200020000ab11332859494d7a8b4c', 'OPTION_245': '624c3620'} def setUp(self): super(TestSystemdParseLeases, self).setUp() self.lease_d = self.tmp_dir() def test_no_leases_returns_empty_dict(self): """A leases dir with no lease files should return empty dictionary.""" self.assertEqual({}, networkd_load_leases(self.lease_d)) def test_no_leases_dir_returns_empty_dict(self): """A non-existing leases dir should return empty dict.""" enodir = os.path.join(self.lease_d, 'does-not-exist') self.assertEqual({}, networkd_load_leases(enodir)) def test_single_leases_file(self): """A leases dir with one leases file.""" populate_dir(self.lease_d, {'2': self.lxd_lease}) self.assertEqual( {'2': self.lxd_parsed}, networkd_load_leases(self.lease_d)) def test_single_azure_leases_file(self): """On Azure, option 245 should be present, verify it specifically.""" populate_dir(self.lease_d, {'1': self.azure_lease}) self.assertEqual( {'1': self.azure_parsed}, networkd_load_leases(self.lease_d)) def test_multiple_files(self): """Multiple leases files on azure with one found return that value.""" self.maxDiff = None populate_dir(self.lease_d, {'1': self.azure_lease, '9': self.lxd_lease}) self.assertEqual({'1': self.azure_parsed, '9': self.lxd_parsed}, networkd_load_leases(self.lease_d)) class TestEphemeralDhcpNoNetworkSetup(HttprettyTestCase): @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') def test_ephemeral_dhcp_no_network_if_url_connectivity(self, m_dhcp): """No EphemeralDhcp4 network setup when connectivity_url succeeds.""" url = 'http://example.org/index.html' httpretty.register_uri(httpretty.GET, url) with net.dhcp.EphemeralDHCPv4(connectivity_url=url) as lease: self.assertIsNone(lease) # Ensure that no teardown happens: m_dhcp.assert_not_called() @mock.patch('cloudinit.net.dhcp.util.subp') @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') def test_ephemeral_dhcp_setup_network_if_url_connectivity( self, m_dhcp, m_subp): """No EphemeralDhcp4 network setup when connectivity_url succeeds.""" url = 'http://example.org/index.html' fake_lease = { 'interface': 'eth9', 'fixed-address': '192.168.2.2', 'subnet-mask': '255.255.0.0'} m_dhcp.return_value = [fake_lease] m_subp.return_value = ('', '') httpretty.register_uri(httpretty.GET, url, body={}, status=404) with net.dhcp.EphemeralDHCPv4(connectivity_url=url) as lease: self.assertEqual(fake_lease, lease) # Ensure that dhcp discovery occurs m_dhcp.called_once_with() # vi: ts=4 expandtab