From 039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 Mon Sep 17 00:00:00 2001 From: Brett Holman Date: Fri, 3 Dec 2021 13:11:46 -0700 Subject: Reorganize unit test locations under tests/unittests (#1126) This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs --- cloudinit/sources/helpers/tests/test_netlink.py | 480 ------------- cloudinit/sources/helpers/tests/test_openstack.py | 49 -- cloudinit/sources/tests/__init__.py | 0 cloudinit/sources/tests/test_init.py | 771 --------------------- cloudinit/sources/tests/test_lxd.py | 376 ---------- cloudinit/sources/tests/test_oracle.py | 797 ---------------------- 6 files changed, 2473 deletions(-) delete mode 100644 cloudinit/sources/helpers/tests/test_netlink.py delete mode 100644 cloudinit/sources/helpers/tests/test_openstack.py delete mode 100644 cloudinit/sources/tests/__init__.py delete mode 100644 cloudinit/sources/tests/test_init.py delete mode 100644 cloudinit/sources/tests/test_lxd.py delete mode 100644 cloudinit/sources/tests/test_oracle.py (limited to 'cloudinit/sources') diff --git a/cloudinit/sources/helpers/tests/test_netlink.py b/cloudinit/sources/helpers/tests/test_netlink.py deleted file mode 100644 index cafe3961..00000000 --- a/cloudinit/sources/helpers/tests/test_netlink.py +++ /dev/null @@ -1,480 +0,0 @@ -# Author: Tamilmani Manoharan -# -# This file is part of cloud-init. See LICENSE file for license information. - -from cloudinit.tests.helpers import CiTestCase, mock -import socket -import struct -import codecs -from cloudinit.sources.helpers.netlink import ( - NetlinkCreateSocketError, create_bound_netlink_socket, read_netlink_socket, - read_rta_oper_state, unpack_rta_attr, wait_for_media_disconnect_connect, - wait_for_nic_attach_event, wait_for_nic_detach_event, - OPER_DOWN, OPER_UP, OPER_DORMANT, OPER_LOWERLAYERDOWN, OPER_NOTPRESENT, - OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_DELLINK, - RTM_SETLINK, RTM_GETLINK, MAX_SIZE) - - -def int_to_bytes(i): - '''convert integer to binary: eg: 1 to \x01''' - hex_value = '{0:x}'.format(i) - hex_value = '0' * (len(hex_value) % 2) + hex_value - return codecs.decode(hex_value, 'hex_codec') - - -class TestCreateBoundNetlinkSocket(CiTestCase): - - @mock.patch('cloudinit.sources.helpers.netlink.socket.socket') - def test_socket_error_on_create(self, m_socket): - '''create_bound_netlink_socket catches socket creation exception''' - - """NetlinkCreateSocketError is raised when socket creation errors.""" - m_socket.side_effect = socket.error("Fake socket failure") - with self.assertRaises(NetlinkCreateSocketError) as ctx_mgr: - create_bound_netlink_socket() - self.assertEqual( - 'Exception during netlink socket create: Fake socket failure', - str(ctx_mgr.exception)) - - -class TestReadNetlinkSocket(CiTestCase): - - @mock.patch('cloudinit.sources.helpers.netlink.socket.socket') - @mock.patch('cloudinit.sources.helpers.netlink.select.select') - def test_read_netlink_socket(self, m_select, m_socket): - '''read_netlink_socket able to receive data''' - data = 'netlinktest' - m_select.return_value = [m_socket], None, None - m_socket.recv.return_value = data - recv_data = read_netlink_socket(m_socket, 2) - m_select.assert_called_with([m_socket], [], [], 2) - m_socket.recv.assert_called_with(MAX_SIZE) - self.assertIsNotNone(recv_data) - self.assertEqual(recv_data, data) - - @mock.patch('cloudinit.sources.helpers.netlink.socket.socket') - @mock.patch('cloudinit.sources.helpers.netlink.select.select') - def test_netlink_read_timeout(self, m_select, m_socket): - '''read_netlink_socket should timeout if nothing to read''' - m_select.return_value = [], None, None - data = read_netlink_socket(m_socket, 1) - m_select.assert_called_with([m_socket], [], [], 1) - self.assertEqual(m_socket.recv.call_count, 0) - self.assertIsNone(data) - - def test_read_invalid_socket(self): - '''read_netlink_socket raises assert error if socket is invalid''' - socket = None - with self.assertRaises(AssertionError) as context: - read_netlink_socket(socket, 1) - self.assertTrue('netlink socket is none' in str(context.exception)) - - -class TestParseNetlinkMessage(CiTestCase): - - def test_read_rta_oper_state(self): - '''read_rta_oper_state could parse netlink message and extract data''' - ifname = "eth0" - bytes = ifname.encode("utf-8") - buf = bytearray(48) - struct.pack_into("HH4sHHc", buf, RTATTR_START_OFFSET, 8, 3, bytes, 5, - 16, int_to_bytes(OPER_DOWN)) - interface_state = read_rta_oper_state(buf) - self.assertEqual(interface_state.ifname, ifname) - self.assertEqual(interface_state.operstate, OPER_DOWN) - - def test_read_none_data(self): - '''read_rta_oper_state raises assert error if data is none''' - data = None - with self.assertRaises(AssertionError) as context: - read_rta_oper_state(data) - self.assertEqual('data is none', str(context.exception)) - - def test_read_invalid_rta_operstate_none(self): - '''read_rta_oper_state returns none if operstate is none''' - ifname = "eth0" - buf = bytearray(40) - bytes = ifname.encode("utf-8") - struct.pack_into("HH4s", buf, RTATTR_START_OFFSET, 8, 3, bytes) - interface_state = read_rta_oper_state(buf) - self.assertIsNone(interface_state) - - def test_read_invalid_rta_ifname_none(self): - '''read_rta_oper_state returns none if ifname is none''' - buf = bytearray(40) - struct.pack_into("HHc", buf, RTATTR_START_OFFSET, 5, 16, - int_to_bytes(OPER_DOWN)) - interface_state = read_rta_oper_state(buf) - self.assertIsNone(interface_state) - - def test_read_invalid_data_len(self): - '''raise assert error if data size is smaller than required size''' - buf = bytearray(32) - with self.assertRaises(AssertionError) as context: - read_rta_oper_state(buf) - self.assertTrue('length of data is smaller than RTATTR_START_OFFSET' in - str(context.exception)) - - def test_unpack_rta_attr_none_data(self): - '''unpack_rta_attr raises assert error if data is none''' - data = None - with self.assertRaises(AssertionError) as context: - unpack_rta_attr(data, RTATTR_START_OFFSET) - self.assertTrue('data is none' in str(context.exception)) - - def test_unpack_rta_attr_invalid_offset(self): - '''unpack_rta_attr raises assert error if offset is invalid''' - data = bytearray(48) - with self.assertRaises(AssertionError) as context: - unpack_rta_attr(data, "offset") - self.assertTrue('offset is not integer' in str(context.exception)) - with self.assertRaises(AssertionError) as context: - unpack_rta_attr(data, 31) - self.assertTrue('rta offset is less than expected length' in - str(context.exception)) - - -@mock.patch('cloudinit.sources.helpers.netlink.socket.socket') -@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket') -class TestNicAttachDetach(CiTestCase): - with_logs = True - - def _media_switch_data(self, ifname, msg_type, operstate): - '''construct netlink data with specified fields''' - if ifname and operstate is not None: - data = bytearray(48) - bytes = ifname.encode("utf-8") - struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3, - bytes, 5, 16, int_to_bytes(operstate)) - elif ifname: - data = bytearray(40) - bytes = ifname.encode("utf-8") - struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes) - elif operstate: - data = bytearray(40) - struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16, - int_to_bytes(operstate)) - struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0) - return data - - def test_nic_attached_oper_down(self, m_read_netlink_socket, m_socket): - '''Test for a new nic attached''' - ifname = "eth0" - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - m_read_netlink_socket.side_effect = [data_op_down] - ifread = wait_for_nic_attach_event(m_socket, []) - self.assertEqual(m_read_netlink_socket.call_count, 1) - self.assertEqual(ifname, ifread) - - def test_nic_attached_oper_up(self, m_read_netlink_socket, m_socket): - '''Test for a new nic attached''' - ifname = "eth0" - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - m_read_netlink_socket.side_effect = [data_op_up] - ifread = wait_for_nic_attach_event(m_socket, []) - self.assertEqual(m_read_netlink_socket.call_count, 1) - self.assertEqual(ifname, ifread) - - def test_nic_attach_ignore_existing(self, m_read_netlink_socket, m_socket): - '''Test that we read only the interfaces we are interested in.''' - data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN) - data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN) - m_read_netlink_socket.side_effect = [data_eth0, data_eth1] - ifread = wait_for_nic_attach_event(m_socket, ["eth0"]) - self.assertEqual(m_read_netlink_socket.call_count, 2) - self.assertEqual("eth1", ifread) - - def test_nic_attach_read_first(self, m_read_netlink_socket, m_socket): - '''Test that we read only the interfaces we are interested in.''' - data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN) - data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN) - m_read_netlink_socket.side_effect = [data_eth0, data_eth1] - ifread = wait_for_nic_attach_event(m_socket, ["eth1"]) - self.assertEqual(m_read_netlink_socket.call_count, 1) - self.assertEqual("eth0", ifread) - - def test_nic_detached(self, m_read_netlink_socket, m_socket): - '''Test for an existing nic detached''' - ifname = "eth0" - data_op_down = self._media_switch_data(ifname, RTM_DELLINK, OPER_DOWN) - m_read_netlink_socket.side_effect = [data_op_down] - ifread = wait_for_nic_detach_event(m_socket) - self.assertEqual(m_read_netlink_socket.call_count, 1) - self.assertEqual(ifname, ifread) - - -@mock.patch('cloudinit.sources.helpers.netlink.socket.socket') -@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket') -class TestWaitForMediaDisconnectConnect(CiTestCase): - with_logs = True - - def _media_switch_data(self, ifname, msg_type, operstate): - '''construct netlink data with specified fields''' - if ifname and operstate is not None: - data = bytearray(48) - bytes = ifname.encode("utf-8") - struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3, - bytes, 5, 16, int_to_bytes(operstate)) - elif ifname: - data = bytearray(40) - bytes = ifname.encode("utf-8") - struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes) - elif operstate: - data = bytearray(40) - struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16, - int_to_bytes(operstate)) - struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0) - return data - - def test_media_down_up_scenario(self, m_read_netlink_socket, - m_socket): - '''Test for media down up sequence for required interface name''' - ifname = "eth0" - # construct data for Oper State down - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - # construct data for Oper State up - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - m_read_netlink_socket.side_effect = [data_op_down, data_op_up] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 2) - - def test_wait_for_media_switch_diff_interface(self, m_read_netlink_socket, - m_socket): - '''wait_for_media_disconnect_connect ignores unexpected interfaces. - - The first two messages are for other interfaces and last two are for - expected interface. So the function exit only after receiving last - 2 messages and therefore the call count for m_read_netlink_socket - has to be 4 - ''' - other_ifname = "eth1" - expected_ifname = "eth0" - data_op_down_eth1 = self._media_switch_data( - other_ifname, RTM_NEWLINK, OPER_DOWN - ) - data_op_up_eth1 = self._media_switch_data( - other_ifname, RTM_NEWLINK, OPER_UP - ) - data_op_down_eth0 = self._media_switch_data( - expected_ifname, RTM_NEWLINK, OPER_DOWN - ) - data_op_up_eth0 = self._media_switch_data( - expected_ifname, RTM_NEWLINK, OPER_UP) - m_read_netlink_socket.side_effect = [ - data_op_down_eth1, - data_op_up_eth1, - data_op_down_eth0, - data_op_up_eth0 - ] - wait_for_media_disconnect_connect(m_socket, expected_ifname) - self.assertIn('Ignored netlink event on interface %s' % other_ifname, - self.logs.getvalue()) - self.assertEqual(m_read_netlink_socket.call_count, 4) - - def test_invalid_msgtype_getlink(self, m_read_netlink_socket, m_socket): - '''wait_for_media_disconnect_connect ignores GETLINK events. - - The first two messages are for oper down and up for RTM_GETLINK type - which netlink module will ignore. The last 2 messages are RTM_NEWLINK - with oper state down and up messages. Therefore the call count for - m_read_netlink_socket has to be 4 ignoring first 2 messages - of RTM_GETLINK - ''' - ifname = "eth0" - data_getlink_down = self._media_switch_data( - ifname, RTM_GETLINK, OPER_DOWN - ) - data_getlink_up = self._media_switch_data( - ifname, RTM_GETLINK, OPER_UP - ) - data_newlink_down = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_DOWN - ) - data_newlink_up = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_UP - ) - m_read_netlink_socket.side_effect = [ - data_getlink_down, - data_getlink_up, - data_newlink_down, - data_newlink_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 4) - - def test_invalid_msgtype_setlink(self, m_read_netlink_socket, m_socket): - '''wait_for_media_disconnect_connect ignores SETLINK events. - - The first two messages are for oper down and up for RTM_GETLINK type - which it will ignore. 3rd and 4th messages are RTM_NEWLINK with down - and up messages. This function should exit after 4th messages since it - sees down->up scenario. So the call count for m_read_netlink_socket - has to be 4 ignoring first 2 messages of RTM_GETLINK and - last 2 messages of RTM_NEWLINK - ''' - ifname = "eth0" - data_setlink_down = self._media_switch_data( - ifname, RTM_SETLINK, OPER_DOWN - ) - data_setlink_up = self._media_switch_data( - ifname, RTM_SETLINK, OPER_UP - ) - data_newlink_down = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_DOWN - ) - data_newlink_up = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_UP - ) - m_read_netlink_socket.side_effect = [ - data_setlink_down, - data_setlink_up, - data_newlink_down, - data_newlink_up, - data_newlink_down, - data_newlink_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 4) - - def test_netlink_invalid_switch_scenario(self, m_read_netlink_socket, - m_socket): - '''returns only if it receives UP event after a DOWN event''' - ifname = "eth0" - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - data_op_dormant = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_DORMANT - ) - data_op_notpresent = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_NOTPRESENT - ) - data_op_lowerdown = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_LOWERLAYERDOWN - ) - data_op_testing = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_TESTING - ) - data_op_unknown = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_UNKNOWN - ) - m_read_netlink_socket.side_effect = [ - data_op_up, data_op_up, - data_op_dormant, data_op_up, - data_op_notpresent, data_op_up, - data_op_lowerdown, data_op_up, - data_op_testing, data_op_up, - data_op_unknown, data_op_up, - data_op_down, data_op_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 14) - - def test_netlink_valid_inbetween_transitions(self, m_read_netlink_socket, - m_socket): - '''wait_for_media_disconnect_connect handles in between transitions''' - ifname = "eth0" - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - data_op_dormant = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_DORMANT) - data_op_unknown = self._media_switch_data( - ifname, RTM_NEWLINK, OPER_UNKNOWN) - m_read_netlink_socket.side_effect = [ - data_op_down, data_op_dormant, - data_op_unknown, data_op_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 4) - - def test_netlink_invalid_operstate(self, m_read_netlink_socket, m_socket): - '''wait_for_media_disconnect_connect should handle invalid operstates. - - The function should not fail and return even if it receives invalid - operstates. It always should wait for down up sequence. - ''' - ifname = "eth0" - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - data_op_invalid = self._media_switch_data(ifname, RTM_NEWLINK, 7) - m_read_netlink_socket.side_effect = [ - data_op_invalid, data_op_up, - data_op_down, data_op_invalid, - data_op_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 5) - - def test_wait_invalid_socket(self, m_read_netlink_socket, m_socket): - '''wait_for_media_disconnect_connect handle none netlink socket.''' - socket = None - ifname = "eth0" - with self.assertRaises(AssertionError) as context: - wait_for_media_disconnect_connect(socket, ifname) - self.assertTrue('netlink socket is none' in str(context.exception)) - - def test_wait_invalid_ifname(self, m_read_netlink_socket, m_socket): - '''wait_for_media_disconnect_connect handle none interface name''' - ifname = None - with self.assertRaises(AssertionError) as context: - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertTrue('interface name is none' in str(context.exception)) - ifname = "" - with self.assertRaises(AssertionError) as context: - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertTrue('interface name cannot be empty' in - str(context.exception)) - - def test_wait_invalid_rta_attr(self, m_read_netlink_socket, m_socket): - ''' wait_for_media_disconnect_connect handles invalid rta data''' - ifname = "eth0" - data_invalid1 = self._media_switch_data(None, RTM_NEWLINK, OPER_DOWN) - data_invalid2 = self._media_switch_data(ifname, RTM_NEWLINK, None) - data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) - data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) - m_read_netlink_socket.side_effect = [ - data_invalid1, data_invalid2, data_op_down, data_op_up - ] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 4) - - def test_read_multiple_netlink_msgs(self, m_read_netlink_socket, m_socket): - '''Read multiple messages in single receive call''' - ifname = "eth0" - bytes = ifname.encode("utf-8") - data = bytearray(96) - struct.pack_into("=LHHLL", data, 0, 48, RTM_NEWLINK, 0, 0, 0) - struct.pack_into( - "HH4sHHc", data, RTATTR_START_OFFSET, 8, 3, - bytes, 5, 16, int_to_bytes(OPER_DOWN) - ) - struct.pack_into("=LHHLL", data, 48, 48, RTM_NEWLINK, 0, 0, 0) - struct.pack_into( - "HH4sHHc", data, 48 + RTATTR_START_OFFSET, 8, - 3, bytes, 5, 16, int_to_bytes(OPER_UP) - ) - m_read_netlink_socket.return_value = data - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 1) - - def test_read_partial_netlink_msgs(self, m_read_netlink_socket, m_socket): - '''Read partial messages in receive call''' - ifname = "eth0" - bytes = ifname.encode("utf-8") - data1 = bytearray(112) - data2 = bytearray(32) - struct.pack_into("=LHHLL", data1, 0, 48, RTM_NEWLINK, 0, 0, 0) - struct.pack_into( - "HH4sHHc", data1, RTATTR_START_OFFSET, 8, 3, - bytes, 5, 16, int_to_bytes(OPER_DOWN) - ) - struct.pack_into("=LHHLL", data1, 48, 48, RTM_NEWLINK, 0, 0, 0) - struct.pack_into( - "HH4sHHc", data1, 80, 8, 3, bytes, 5, 16, int_to_bytes(OPER_DOWN) - ) - struct.pack_into("=LHHLL", data1, 96, 48, RTM_NEWLINK, 0, 0, 0) - struct.pack_into( - "HH4sHHc", data2, 16, 8, 3, bytes, 5, 16, int_to_bytes(OPER_UP) - ) - m_read_netlink_socket.side_effect = [data1, data2] - wait_for_media_disconnect_connect(m_socket, ifname) - self.assertEqual(m_read_netlink_socket.call_count, 2) diff --git a/cloudinit/sources/helpers/tests/test_openstack.py b/cloudinit/sources/helpers/tests/test_openstack.py deleted file mode 100644 index 95fb9743..00000000 --- a/cloudinit/sources/helpers/tests/test_openstack.py +++ /dev/null @@ -1,49 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. -# ./cloudinit/sources/helpers/tests/test_openstack.py -from unittest import mock - -from cloudinit.sources.helpers import openstack -from cloudinit.tests import helpers as test_helpers - - -@mock.patch( - "cloudinit.net.is_openvswitch_internal_interface", - mock.Mock(return_value=False) -) -class TestConvertNetJson(test_helpers.CiTestCase): - - def test_phy_types(self): - """Verify the different known physical types are handled.""" - # network_data.json example from - # https://docs.openstack.org/nova/latest/user/metadata.html - mac0 = "fa:16:3e:9c:bf:3d" - net_json = { - "links": [ - {"ethernet_mac_address": mac0, "id": "tapcd9f6d46-4a", - "mtu": None, "type": "bridge", - "vif_id": "cd9f6d46-4a3a-43ab-a466-994af9db96fc"} - ], - "networks": [ - {"id": "network0", "link": "tapcd9f6d46-4a", - "network_id": "99e88329-f20d-4741-9593-25bf07847b16", - "type": "ipv4_dhcp"} - ], - "services": [{"address": "8.8.8.8", "type": "dns"}] - } - macs = {mac0: 'eth0'} - - expected = { - 'version': 1, - 'config': [ - {'mac_address': 'fa:16:3e:9c:bf:3d', - 'mtu': None, 'name': 'eth0', - 'subnets': [{'type': 'dhcp4'}], - 'type': 'physical'}, - {'address': '8.8.8.8', 'type': 'nameserver'}]} - - for t in openstack.KNOWN_PHYSICAL_TYPES: - net_json["links"][0]["type"] = t - self.assertEqual( - expected, - openstack.convert_net_json(network_json=net_json, - known_macs=macs)) diff --git a/cloudinit/sources/tests/__init__.py b/cloudinit/sources/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/cloudinit/sources/tests/test_init.py b/cloudinit/sources/tests/test_init.py deleted file mode 100644 index ae09cb17..00000000 --- a/cloudinit/sources/tests/test_init.py +++ /dev/null @@ -1,771 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import copy -import inspect -import os -import stat - -from cloudinit.event import EventScope, EventType -from cloudinit.helpers import Paths -from cloudinit import importer -from cloudinit.sources import ( - EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE, - METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource, - canonical_cloud_id, redact_sensitive_keys) -from cloudinit.tests.helpers import CiTestCase, mock -from cloudinit.user_data import UserDataProcessor -from cloudinit import util - - -class DataSourceTestSubclassNet(DataSource): - - dsname = 'MyTestSubclass' - url_max_wait = 55 - - def __init__(self, sys_cfg, distro, paths, custom_metadata=None, - custom_userdata=None, get_data_retval=True): - super(DataSourceTestSubclassNet, self).__init__( - sys_cfg, distro, paths) - self._custom_userdata = custom_userdata - self._custom_metadata = custom_metadata - self._get_data_retval = get_data_retval - - def _get_cloud_name(self): - return 'SubclassCloudName' - - def _get_data(self): - if self._custom_metadata: - self.metadata = self._custom_metadata - else: - self.metadata = {'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion'} - if self._custom_userdata: - self.userdata_raw = self._custom_userdata - else: - self.userdata_raw = 'userdata_raw' - self.vendordata_raw = 'vendordata_raw' - return self._get_data_retval - - -class InvalidDataSourceTestSubclassNet(DataSource): - pass - - -class TestDataSource(CiTestCase): - - with_logs = True - maxDiff = None - - def setUp(self): - super(TestDataSource, self).setUp() - self.sys_cfg = {'datasource': {'_undef': {'key1': False}}} - self.distro = 'distrotest' # generally should be a Distro object - self.paths = Paths({}) - self.datasource = DataSource(self.sys_cfg, self.distro, self.paths) - - def test_datasource_init(self): - """DataSource initializes metadata attributes, ds_cfg and ud_proc.""" - self.assertEqual(self.paths, self.datasource.paths) - self.assertEqual(self.sys_cfg, self.datasource.sys_cfg) - self.assertEqual(self.distro, self.datasource.distro) - self.assertIsNone(self.datasource.userdata) - self.assertEqual({}, self.datasource.metadata) - self.assertIsNone(self.datasource.userdata_raw) - self.assertIsNone(self.datasource.vendordata) - self.assertIsNone(self.datasource.vendordata_raw) - self.assertEqual({'key1': False}, self.datasource.ds_cfg) - self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor) - - def test_datasource_init_gets_ds_cfg_using_dsname(self): - """Init uses DataSource.dsname for sourcing ds_cfg.""" - sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}} - distro = 'distrotest' # generally should be a Distro object - datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths) - self.assertEqual({'key2': False}, datasource.ds_cfg) - - def test_str_is_classname(self): - """The string representation of the datasource is the classname.""" - self.assertEqual('DataSource', str(self.datasource)) - self.assertEqual( - 'DataSourceTestSubclassNet', - str(DataSourceTestSubclassNet('', '', self.paths))) - - def test_datasource_get_url_params_defaults(self): - """get_url_params default url config settings for the datasource.""" - params = self.datasource.get_url_params() - self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait) - self.assertEqual(params.timeout_seconds, self.datasource.url_timeout) - self.assertEqual(params.num_retries, self.datasource.url_retries) - self.assertEqual(params.sec_between_retries, - self.datasource.url_sec_between_retries) - - def test_datasource_get_url_params_subclassed(self): - """Subclasses can override get_url_params defaults.""" - sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}} - distro = 'distrotest' # generally should be a Distro object - datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths) - expected = (datasource.url_max_wait, datasource.url_timeout, - datasource.url_retries, datasource.url_sec_between_retries) - url_params = datasource.get_url_params() - self.assertNotEqual(self.datasource.get_url_params(), url_params) - self.assertEqual(expected, url_params) - - def test_datasource_get_url_params_ds_config_override(self): - """Datasource configuration options can override url param defaults.""" - sys_cfg = { - 'datasource': { - 'MyTestSubclass': { - 'max_wait': '1', 'timeout': '2', - 'retries': '3', 'sec_between_retries': 4 - }}} - datasource = DataSourceTestSubclassNet( - sys_cfg, self.distro, self.paths) - expected = (1, 2, 3, 4) - url_params = datasource.get_url_params() - self.assertNotEqual( - (datasource.url_max_wait, datasource.url_timeout, - datasource.url_retries, datasource.url_sec_between_retries), - url_params) - self.assertEqual(expected, url_params) - - def test_datasource_get_url_params_is_zero_or_greater(self): - """get_url_params ignores timeouts with a value below 0.""" - # Set an override that is below 0 which gets ignored. - sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}} - datasource = DataSource(sys_cfg, self.distro, self.paths) - (_max_wait, timeout, _retries, - _sec_between_retries) = datasource.get_url_params() - self.assertEqual(0, timeout) - - def test_datasource_get_url_uses_defaults_on_errors(self): - """On invalid system config values for url_params defaults are used.""" - # All invalid values should be logged - sys_cfg = {'datasource': { - '_undef': { - 'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}} - datasource = DataSource(sys_cfg, self.distro, self.paths) - url_params = datasource.get_url_params() - expected = (datasource.url_max_wait, datasource.url_timeout, - datasource.url_retries, datasource.url_sec_between_retries) - self.assertEqual(expected, url_params) - logs = self.logs.getvalue() - expected_logs = [ - "Config max_wait 'nope' is not an int, using default '-1'", - "Config timeout 'bug' is not an int, using default '10'", - "Config retries 'nonint' is not an int, using default '5'", - ] - for log in expected_logs: - self.assertIn(log, logs) - - @mock.patch('cloudinit.sources.net.find_fallback_nic') - def test_fallback_interface_is_discovered(self, m_get_fallback_nic): - """The fallback_interface is discovered via find_fallback_nic.""" - m_get_fallback_nic.return_value = 'nic9' - self.assertEqual('nic9', self.datasource.fallback_interface) - - @mock.patch('cloudinit.sources.net.find_fallback_nic') - def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic): - """Log a warning when fallback_interface can not discover the nic.""" - self.datasource._cloud_name = 'MySupahCloud' - m_get_fallback_nic.return_value = None # Couldn't discover nic - self.assertIsNone(self.datasource.fallback_interface) - self.assertEqual( - 'WARNING: Did not find a fallback interface on MySupahCloud.\n', - self.logs.getvalue()) - - @mock.patch('cloudinit.sources.net.find_fallback_nic') - def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic): - """The fallback_interface is cached and won't be rediscovered.""" - self.datasource._fallback_interface = 'nic10' - self.assertEqual('nic10', self.datasource.fallback_interface) - m_get_fallback_nic.assert_not_called() - - def test__get_data_unimplemented(self): - """Raise an error when _get_data is not implemented.""" - with self.assertRaises(NotImplementedError) as context_manager: - self.datasource.get_data() - self.assertIn( - 'Subclasses of DataSource must implement _get_data', - str(context_manager.exception)) - datasource2 = InvalidDataSourceTestSubclassNet( - self.sys_cfg, self.distro, self.paths) - with self.assertRaises(NotImplementedError) as context_manager: - datasource2.get_data() - self.assertIn( - 'Subclasses of DataSource must implement _get_data', - str(context_manager.exception)) - - def test_get_data_calls_subclass__get_data(self): - """Datasource.get_data uses the subclass' version of _get_data.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertTrue(datasource.get_data()) - self.assertEqual( - {'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion'}, - datasource.metadata) - self.assertEqual('userdata_raw', datasource.userdata_raw) - self.assertEqual('vendordata_raw', datasource.vendordata_raw) - - def test_get_hostname_strips_local_hostname_without_domain(self): - """Datasource.get_hostname strips metadata local-hostname of domain.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertTrue(datasource.get_data()) - self.assertEqual( - 'test-subclass-hostname', datasource.metadata['local-hostname']) - self.assertEqual('test-subclass-hostname', datasource.get_hostname()) - datasource.metadata['local-hostname'] = 'hostname.my.domain.com' - self.assertEqual('hostname', datasource.get_hostname()) - - def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self): - """Datasource.get_hostname with fqdn set gets qualified hostname.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertTrue(datasource.get_data()) - datasource.metadata['local-hostname'] = 'hostname.my.domain.com' - self.assertEqual( - 'hostname.my.domain.com', datasource.get_hostname(fqdn=True)) - - def test_get_hostname_without_metadata_uses_system_hostname(self): - """Datasource.gethostname runs util.get_hostname when no metadata.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertEqual({}, datasource.metadata) - mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' - with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: - with mock.patch(mock_fqdn) as m_fqdn: - m_gethost.return_value = 'systemhostname.domain.com' - m_fqdn.return_value = None # No maching fqdn in /etc/hosts - self.assertEqual('systemhostname', datasource.get_hostname()) - self.assertEqual( - 'systemhostname.domain.com', - datasource.get_hostname(fqdn=True)) - - def test_get_hostname_without_metadata_returns_none(self): - """Datasource.gethostname returns None when metadata_only and no MD.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertEqual({}, datasource.metadata) - mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' - with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: - with mock.patch(mock_fqdn) as m_fqdn: - self.assertIsNone(datasource.get_hostname(metadata_only=True)) - self.assertIsNone( - datasource.get_hostname(fqdn=True, metadata_only=True)) - self.assertEqual([], m_gethost.call_args_list) - self.assertEqual([], m_fqdn.call_args_list) - - def test_get_hostname_without_metadata_prefers_etc_hosts(self): - """Datasource.gethostname prefers /etc/hosts to util.get_hostname.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - self.assertEqual({}, datasource.metadata) - mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' - with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: - with mock.patch(mock_fqdn) as m_fqdn: - m_gethost.return_value = 'systemhostname.domain.com' - m_fqdn.return_value = 'fqdnhostname.domain.com' - self.assertEqual('fqdnhostname', datasource.get_hostname()) - self.assertEqual('fqdnhostname.domain.com', - datasource.get_hostname(fqdn=True)) - - def test_get_data_does_not_write_instance_data_on_failure(self): - """get_data does not write INSTANCE_JSON_FILE on get_data False.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp}), - get_data_retval=False) - self.assertFalse(datasource.get_data()) - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - self.assertFalse( - os.path.exists(json_file), 'Found unexpected file %s' % json_file) - - def test_get_data_writes_json_instance_data_on_success(self): - """get_data writes INSTANCE_JSON_FILE to run_dir as world readable.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - sys_info = { - "python": "3.7", - "platform": - "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", - "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", - "x86_64"], - "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} - with mock.patch("cloudinit.util.system_info", return_value=sys_info): - datasource.get_data() - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - content = util.load_file(json_file) - expected = { - 'base64_encoded_keys': [], - 'merged_cfg': REDACT_SENSITIVE_VALUE, - 'sensitive_keys': ['merged_cfg'], - 'sys_info': sys_info, - 'v1': { - '_beta_keys': ['subplatform'], - 'availability-zone': 'myaz', - 'availability_zone': 'myaz', - 'cloud-name': 'subclasscloudname', - 'cloud_name': 'subclasscloudname', - 'distro': 'ubuntu', - 'distro_release': 'focal', - 'distro_version': '20.04', - 'instance-id': 'iid-datasource', - 'instance_id': 'iid-datasource', - 'local-hostname': 'test-subclass-hostname', - 'local_hostname': 'test-subclass-hostname', - 'kernel_release': '5.4.0-24-generic', - 'machine': 'x86_64', - 'platform': 'mytestsubclass', - 'public_ssh_keys': [], - 'python_version': '3.7', - 'region': 'myregion', - 'system_platform': - 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', - 'subplatform': 'unknown', - 'variant': 'ubuntu'}, - 'ds': { - - '_doc': EXPERIMENTAL_TEXT, - 'meta_data': {'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion'}}} - self.assertEqual(expected, util.load_json(content)) - file_stat = os.stat(json_file) - self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) - self.assertEqual(expected, util.load_json(content)) - - def test_get_data_writes_redacted_public_json_instance_data(self): - """get_data writes redacted content to public INSTANCE_JSON_FILE.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp}), - custom_metadata={ - 'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion', - 'some': {'security-credentials': { - 'cred1': 'sekret', 'cred2': 'othersekret'}}}) - self.assertCountEqual( - ('merged_cfg', 'security-credentials',), - datasource.sensitive_metadata_keys) - sys_info = { - "python": "3.7", - "platform": - "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", - "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", - "x86_64"], - "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} - with mock.patch("cloudinit.util.system_info", return_value=sys_info): - datasource.get_data() - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - redacted = util.load_json(util.load_file(json_file)) - expected = { - 'base64_encoded_keys': [], - 'merged_cfg': REDACT_SENSITIVE_VALUE, - 'sensitive_keys': [ - 'ds/meta_data/some/security-credentials', 'merged_cfg'], - 'sys_info': sys_info, - 'v1': { - '_beta_keys': ['subplatform'], - 'availability-zone': 'myaz', - 'availability_zone': 'myaz', - 'cloud-name': 'subclasscloudname', - 'cloud_name': 'subclasscloudname', - 'distro': 'ubuntu', - 'distro_release': 'focal', - 'distro_version': '20.04', - 'instance-id': 'iid-datasource', - 'instance_id': 'iid-datasource', - 'local-hostname': 'test-subclass-hostname', - 'local_hostname': 'test-subclass-hostname', - 'kernel_release': '5.4.0-24-generic', - 'machine': 'x86_64', - 'platform': 'mytestsubclass', - 'public_ssh_keys': [], - 'python_version': '3.7', - 'region': 'myregion', - 'system_platform': - 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', - 'subplatform': 'unknown', - 'variant': 'ubuntu'}, - 'ds': { - '_doc': EXPERIMENTAL_TEXT, - 'meta_data': { - 'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion', - 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}} - } - self.assertCountEqual(expected, redacted) - file_stat = os.stat(json_file) - self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) - - def test_get_data_writes_json_instance_data_sensitive(self): - """ - get_data writes unmodified data to sensitive file as root-readonly. - """ - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp}), - custom_metadata={ - 'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion', - 'some': {'security-credentials': { - 'cred1': 'sekret', 'cred2': 'othersekret'}}}) - sys_info = { - "python": "3.7", - "platform": - "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", - "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", - "x86_64"], - "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} - - self.assertCountEqual( - ('merged_cfg', 'security-credentials',), - datasource.sensitive_metadata_keys) - with mock.patch("cloudinit.util.system_info", return_value=sys_info): - datasource.get_data() - sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp) - content = util.load_file(sensitive_json_file) - expected = { - 'base64_encoded_keys': [], - 'merged_cfg': { - '_doc': ( - 'Merged cloud-init system config from ' - '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/' - ), - 'datasource': {'_undef': {'key1': False}}}, - 'sensitive_keys': [ - 'ds/meta_data/some/security-credentials', 'merged_cfg'], - 'sys_info': sys_info, - 'v1': { - '_beta_keys': ['subplatform'], - 'availability-zone': 'myaz', - 'availability_zone': 'myaz', - 'cloud-name': 'subclasscloudname', - 'cloud_name': 'subclasscloudname', - 'distro': 'ubuntu', - 'distro_release': 'focal', - 'distro_version': '20.04', - 'instance-id': 'iid-datasource', - 'instance_id': 'iid-datasource', - 'kernel_release': '5.4.0-24-generic', - 'local-hostname': 'test-subclass-hostname', - 'local_hostname': 'test-subclass-hostname', - 'machine': 'x86_64', - 'platform': 'mytestsubclass', - 'public_ssh_keys': [], - 'python_version': '3.7', - 'region': 'myregion', - 'subplatform': 'unknown', - 'system_platform': - 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', - 'variant': 'ubuntu'}, - 'ds': { - '_doc': EXPERIMENTAL_TEXT, - 'meta_data': { - 'availability_zone': 'myaz', - 'local-hostname': 'test-subclass-hostname', - 'region': 'myregion', - 'some': { - 'security-credentials': - {'cred1': 'sekret', 'cred2': 'othersekret'}}}} - } - self.assertCountEqual(expected, util.load_json(content)) - file_stat = os.stat(sensitive_json_file) - self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode)) - self.assertEqual(expected, util.load_json(content)) - - def test_get_data_handles_redacted_unserializable_content(self): - """get_data warns unserializable content in INSTANCE_JSON_FILE.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp}), - custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}}) - datasource.get_data() - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - content = util.load_file(json_file) - expected_metadata = { - 'key1': 'val1', - 'key2': { - 'key2.1': "Warning: redacted unserializable type "}} - instance_json = util.load_json(content) - self.assertEqual( - expected_metadata, instance_json['ds']['meta_data']) - - def test_persist_instance_data_writes_ec2_metadata_when_set(self): - """When ec2_metadata class attribute is set, persist to json.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - datasource.ec2_metadata = UNSET - datasource.get_data() - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - instance_data = util.load_json(util.load_file(json_file)) - self.assertNotIn('ec2_metadata', instance_data['ds']) - datasource.ec2_metadata = {'ec2stuff': 'is good'} - datasource.persist_instance_data() - instance_data = util.load_json(util.load_file(json_file)) - self.assertEqual( - {'ec2stuff': 'is good'}, - instance_data['ds']['ec2_metadata']) - - def test_persist_instance_data_writes_network_json_when_set(self): - """When network_data.json class attribute is set, persist to json.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp})) - datasource.get_data() - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - instance_data = util.load_json(util.load_file(json_file)) - self.assertNotIn('network_json', instance_data['ds']) - datasource.network_json = {'network_json': 'is good'} - datasource.persist_instance_data() - instance_data = util.load_json(util.load_file(json_file)) - self.assertEqual( - {'network_json': 'is good'}, - instance_data['ds']['network_json']) - - def test_get_data_base64encodes_unserializable_bytes(self): - """On py3, get_data base64encodes any unserializable content.""" - tmp = self.tmp_dir() - datasource = DataSourceTestSubclassNet( - self.sys_cfg, self.distro, Paths({'run_dir': tmp}), - custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}}) - self.assertTrue(datasource.get_data()) - json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) - content = util.load_file(json_file) - instance_json = util.load_json(content) - self.assertCountEqual( - ['ds/meta_data/key2/key2.1'], - instance_json['base64_encoded_keys']) - self.assertEqual( - {'key1': 'val1', 'key2': {'key2.1': 'EjM='}}, - instance_json['ds']['meta_data']) - - def test_get_hostname_subclass_support(self): - """Validate get_hostname signature on all subclasses of DataSource.""" - base_args = inspect.getfullargspec(DataSource.get_hostname) - # Import all DataSource subclasses so we can inspect them. - modules = util.find_modules(os.path.dirname(os.path.dirname(__file__))) - for _loc, name in modules.items(): - mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], []) - if mod_locs: - importer.import_module(mod_locs[0]) - for child in DataSource.__subclasses__(): - if 'Test' in child.dsname: - continue - self.assertEqual( - base_args, - inspect.getfullargspec(child.get_hostname), - '%s does not implement DataSource.get_hostname params' - % child) - for grandchild in child.__subclasses__(): - self.assertEqual( - base_args, - inspect.getfullargspec(grandchild.get_hostname), - '%s does not implement DataSource.get_hostname params' - % grandchild) - - def test_clear_cached_attrs_resets_cached_attr_class_attributes(self): - """Class attributes listed in cached_attr_defaults are reset.""" - count = 0 - # Setup values for all cached class attributes - for attr, value in self.datasource.cached_attr_defaults: - setattr(self.datasource, attr, count) - count += 1 - self.datasource._dirty_cache = True - self.datasource.clear_cached_attrs() - for attr, value in self.datasource.cached_attr_defaults: - self.assertEqual(value, getattr(self.datasource, attr)) - - def test_clear_cached_attrs_noops_on_clean_cache(self): - """Class attributes listed in cached_attr_defaults are reset.""" - count = 0 - # Setup values for all cached class attributes - for attr, _ in self.datasource.cached_attr_defaults: - setattr(self.datasource, attr, count) - count += 1 - self.datasource._dirty_cache = False # Fake clean cache - self.datasource.clear_cached_attrs() - count = 0 - for attr, _ in self.datasource.cached_attr_defaults: - self.assertEqual(count, getattr(self.datasource, attr)) - count += 1 - - def test_clear_cached_attrs_skips_non_attr_class_attributes(self): - """Skip any cached_attr_defaults which aren't class attributes.""" - self.datasource._dirty_cache = True - self.datasource.clear_cached_attrs() - for attr in ('ec2_metadata', 'network_json'): - self.assertFalse(hasattr(self.datasource, attr)) - - def test_clear_cached_attrs_of_custom_attrs(self): - """Custom attr_values can be passed to clear_cached_attrs.""" - self.datasource._dirty_cache = True - cached_attr_name = self.datasource.cached_attr_defaults[0][0] - setattr(self.datasource, cached_attr_name, 'himom') - self.datasource.myattr = 'orig' - self.datasource.clear_cached_attrs( - attr_defaults=(('myattr', 'updated'),)) - self.assertEqual('himom', getattr(self.datasource, cached_attr_name)) - self.assertEqual('updated', self.datasource.myattr) - - @mock.patch.dict(DataSource.default_update_events, { - EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) - @mock.patch.dict(DataSource.supported_update_events, { - EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) - def test_update_metadata_only_acts_on_supported_update_events(self): - """update_metadata_if_supported wont get_data on unsupported events.""" - self.assertEqual( - {EventScope.NETWORK: set([EventType.BOOT_NEW_INSTANCE])}, - self.datasource.default_update_events - ) - - def fake_get_data(): - raise Exception('get_data should not be called') - - self.datasource.get_data = fake_get_data - self.assertFalse( - self.datasource.update_metadata_if_supported( - source_event_types=[EventType.BOOT])) - - @mock.patch.dict(DataSource.supported_update_events, { - EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) - def test_update_metadata_returns_true_on_supported_update_event(self): - """update_metadata_if_supported returns get_data on supported events""" - def fake_get_data(): - return True - - self.datasource.get_data = fake_get_data - self.datasource._network_config = 'something' - self.datasource._dirty_cache = True - self.assertTrue( - self.datasource.update_metadata_if_supported( - source_event_types=[ - EventType.BOOT, EventType.BOOT_NEW_INSTANCE])) - self.assertEqual(UNSET, self.datasource._network_config) - - self.assertIn( - "DEBUG: Update datasource metadata and network config due to" - " events: boot-new-instance", - self.logs.getvalue() - ) - - -class TestRedactSensitiveData(CiTestCase): - - def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self): - """When sensitive_keys is absent or empty from metadata do nothing.""" - md = {'my': 'data'} - self.assertEqual( - md, redact_sensitive_keys(md, redact_value='redacted')) - md['sensitive_keys'] = [] - self.assertEqual( - md, redact_sensitive_keys(md, redact_value='redacted')) - - def test_redact_sensitive_data_redacts_exact_match_name(self): - """Only exact matched sensitive_keys are redacted from metadata.""" - md = {'sensitive_keys': ['md/secure'], - 'md': {'secure': 's3kr1t', 'insecure': 'publik'}} - secure_md = copy.deepcopy(md) - secure_md['md']['secure'] = 'redacted' - self.assertEqual( - secure_md, - redact_sensitive_keys(md, redact_value='redacted')) - - def test_redact_sensitive_data_does_redacts_with_default_string(self): - """When redact_value is absent, REDACT_SENSITIVE_VALUE is used.""" - md = {'sensitive_keys': ['md/secure'], - 'md': {'secure': 's3kr1t', 'insecure': 'publik'}} - secure_md = copy.deepcopy(md) - secure_md['md']['secure'] = 'redacted for non-root user' - self.assertEqual( - secure_md, - redact_sensitive_keys(md)) - - -class TestCanonicalCloudID(CiTestCase): - - def test_cloud_id_returns_platform_on_unknowns(self): - """When region and cloud_name are unknown, return platform.""" - self.assertEqual( - 'platform', - canonical_cloud_id(cloud_name=METADATA_UNKNOWN, - region=METADATA_UNKNOWN, - platform='platform')) - - def test_cloud_id_returns_platform_on_none(self): - """When region and cloud_name are unknown, return platform.""" - self.assertEqual( - 'platform', - canonical_cloud_id(cloud_name=None, - region=None, - platform='platform')) - - def test_cloud_id_returns_cloud_name_on_unknown_region(self): - """When region is unknown, return cloud_name.""" - for region in (None, METADATA_UNKNOWN): - self.assertEqual( - 'cloudname', - canonical_cloud_id(cloud_name='cloudname', - region=region, - platform='platform')) - - def test_cloud_id_returns_platform_on_unknown_cloud_name(self): - """When region is set but cloud_name is unknown return cloud_name.""" - self.assertEqual( - 'platform', - canonical_cloud_id(cloud_name=METADATA_UNKNOWN, - region='region', - platform='platform')) - - def test_cloud_id_aws_based_on_region_and_cloud_name(self): - """When cloud_name is aws, return proper cloud-id based on region.""" - self.assertEqual( - 'aws-china', - canonical_cloud_id(cloud_name='aws', - region='cn-north-1', - platform='platform')) - self.assertEqual( - 'aws', - canonical_cloud_id(cloud_name='aws', - region='us-east-1', - platform='platform')) - self.assertEqual( - 'aws-gov', - canonical_cloud_id(cloud_name='aws', - region='us-gov-1', - platform='platform')) - self.assertEqual( # Overrideen non-aws cloud_name is returned - '!aws', - canonical_cloud_id(cloud_name='!aws', - region='us-gov-1', - platform='platform')) - - def test_cloud_id_azure_based_on_region_and_cloud_name(self): - """Report cloud-id when cloud_name is azure and region is in china.""" - self.assertEqual( - 'azure-china', - canonical_cloud_id(cloud_name='azure', - region='chinaeast', - platform='platform')) - self.assertEqual( - 'azure', - canonical_cloud_id(cloud_name='azure', - region='!chinaeast', - platform='platform')) - -# vi: ts=4 expandtab diff --git a/cloudinit/sources/tests/test_lxd.py b/cloudinit/sources/tests/test_lxd.py deleted file mode 100644 index a6e51f3b..00000000 --- a/cloudinit/sources/tests/test_lxd.py +++ /dev/null @@ -1,376 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from collections import namedtuple -from copy import deepcopy -import json -import re -import stat -from unittest import mock -import yaml - -import pytest - -from cloudinit.sources import ( - DataSourceLXD as lxd, InvalidMetaDataException, UNSET -) -DS_PATH = "cloudinit.sources.DataSourceLXD." - - -LStatResponse = namedtuple("lstatresponse", "st_mode") - - -NETWORK_V1 = { - "version": 1, - "config": [ - { - "type": "physical", "name": "eth0", - "subnets": [{"type": "dhcp", "control": "auto"}] - } - ] -} - - -def _add_network_v1_device(devname) -> dict: - """Helper to inject device name into default network v1 config.""" - network_cfg = deepcopy(NETWORK_V1) - network_cfg["config"][0]["name"] = devname - return network_cfg - - -LXD_V1_METADATA = { - "meta-data": "instance-id: my-lxc\nlocal-hostname: my-lxc\n\n", - "network-config": NETWORK_V1, - "user-data": "#cloud-config\npackages: [sl]\n", - "vendor-data": "#cloud-config\nruncmd: ['echo vendor-data']\n", - "config": { - "user.user-data": - "instance-id: my-lxc\nlocal-hostname: my-lxc\n\n", - "user.vendor-data": - "#cloud-config\nruncmd: ['echo vendor-data']\n", - "user.network-config": yaml.safe_dump(NETWORK_V1), - } -} - - -@pytest.fixture -def lxd_metadata(): - return LXD_V1_METADATA - - -@pytest.yield_fixture -def lxd_ds(request, paths, lxd_metadata): - """ - Return an instantiated DataSourceLXD. - - This also performs the mocking required for the default test case: - * ``is_platform_viable`` returns True, - * ``read_metadata`` returns ``LXD_V1_METADATA`` - - (This uses the paths fixture for the required helpers.Paths object) - """ - with mock.patch(DS_PATH + "is_platform_viable", return_value=True): - with mock.patch(DS_PATH + "read_metadata", return_value=lxd_metadata): - yield lxd.DataSourceLXD( - sys_cfg={}, distro=mock.Mock(), paths=paths - ) - - -class TestGenerateFallbackNetworkConfig: - - @pytest.mark.parametrize( - "uname_machine,systemd_detect_virt,expected", ( - # None for systemd_detect_virt returns None from which - ({}, None, NETWORK_V1), - ({}, None, NETWORK_V1), - ("anything", "lxc\n", NETWORK_V1), - # `uname -m` on kvm determines devname - ("x86_64", "kvm\n", _add_network_v1_device("enp5s0")), - ("ppc64le", "kvm\n", _add_network_v1_device("enp0s5")), - ("s390x", "kvm\n", _add_network_v1_device("enc9")) - ) - ) - @mock.patch(DS_PATH + "util.system_info") - @mock.patch(DS_PATH + "subp.subp") - @mock.patch(DS_PATH + "subp.which") - def test_net_v2_based_on_network_mode_virt_type_and_uname_machine( - self, - m_which, - m_subp, - m_system_info, - uname_machine, - systemd_detect_virt, - expected, - ): - """Return network config v2 based on uname -m, systemd-detect-virt.""" - if systemd_detect_virt is None: - m_which.return_value = None - m_system_info.return_value = {"uname": ["", "", "", "", uname_machine]} - m_subp.return_value = (systemd_detect_virt, "") - assert expected == lxd.generate_fallback_network_config() - if systemd_detect_virt is None: - assert 0 == m_subp.call_count - assert 0 == m_system_info.call_count - else: - assert [ - mock.call(["systemd-detect-virt"]) - ] == m_subp.call_args_list - if systemd_detect_virt != "kvm\n": - assert 0 == m_system_info.call_count - else: - assert 1 == m_system_info.call_count - - -class TestDataSourceLXD: - def test_platform_info(self, lxd_ds): - assert "LXD" == lxd_ds.dsname - assert "lxd" == lxd_ds.cloud_name - assert "lxd" == lxd_ds.platform_type - - def test_subplatform(self, lxd_ds): - assert "LXD socket API v. 1.0 (/dev/lxd/sock)" == lxd_ds.subplatform - - def test__get_data(self, lxd_ds): - """get_data calls read_metadata, setting appropiate instance attrs.""" - assert UNSET == lxd_ds._crawled_metadata - assert UNSET == lxd_ds._network_config - assert None is lxd_ds.userdata_raw - assert True is lxd_ds._get_data() - assert LXD_V1_METADATA == lxd_ds._crawled_metadata - # network-config is dumped from YAML - assert NETWORK_V1 == lxd_ds._network_config - # Any user-data and vendor-data are saved as raw - assert LXD_V1_METADATA["user-data"] == lxd_ds.userdata_raw - assert LXD_V1_METADATA["vendor-data"] == lxd_ds.vendordata_raw - - -class TestIsPlatformViable: - @pytest.mark.parametrize( - "exists,lstat_mode,expected", ( - (False, None, False), - (True, stat.S_IFREG, False), - (True, stat.S_IFSOCK, True), - ) - ) - @mock.patch(DS_PATH + "os.lstat") - @mock.patch(DS_PATH + "os.path.exists") - def test_expected_viable( - self, m_exists, m_lstat, exists, lstat_mode, expected - ): - """Return True only when LXD_SOCKET_PATH exists and is a socket.""" - m_exists.return_value = exists - m_lstat.return_value = LStatResponse(lstat_mode) - assert expected is lxd.is_platform_viable() - m_exists.assert_has_calls([mock.call(lxd.LXD_SOCKET_PATH)]) - if exists: - m_lstat.assert_has_calls([mock.call(lxd.LXD_SOCKET_PATH)]) - else: - assert 0 == m_lstat.call_count - - -class TestReadMetadata: - @pytest.mark.parametrize( - "url_responses,expected,logs", ( - ( # Assert non-JSON format from config route - { - "http://lxd/1.0/meta-data": "local-hostname: md\n", - "http://lxd/1.0/config": "[NOT_JSON", - }, - InvalidMetaDataException( - "Unable to determine cloud-init config from" - " http://lxd/1.0/config. Expected JSON but found:" - " [NOT_JSON"), - ["[GET] [HTTP:200] http://lxd/1.0/meta-data", - "[GET] [HTTP:200] http://lxd/1.0/config"], - ), - ( # Assert success on just meta-data - { - "http://lxd/1.0/meta-data": "local-hostname: md\n", - "http://lxd/1.0/config": "[]", - }, - { - "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION, - "config": {}, "meta-data": "local-hostname: md\n" - }, - ["[GET] [HTTP:200] http://lxd/1.0/meta-data", - "[GET] [HTTP:200] http://lxd/1.0/config"], - ), - ( # Assert 404s for config routes log skipping - { - "http://lxd/1.0/meta-data": "local-hostname: md\n", - "http://lxd/1.0/config": - '["/1.0/config/user.custom1",' - ' "/1.0/config/user.meta-data",' - ' "/1.0/config/user.network-config",' - ' "/1.0/config/user.user-data",' - ' "/1.0/config/user.vendor-data"]', - "http://lxd/1.0/config/user.custom1": "custom1", - "http://lxd/1.0/config/user.meta-data": "", # 404 - "http://lxd/1.0/config/user.network-config": "net-config", - "http://lxd/1.0/config/user.user-data": "", # 404 - "http://lxd/1.0/config/user.vendor-data": "", # 404 - }, - { - "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION, - "config": { - "user.custom1": "custom1", # Not promoted - "user.network-config": "net-config", - }, - "meta-data": "local-hostname: md\n", - "network-config": "net-config", - }, - [ - "Skipping http://lxd/1.0/config/user.vendor-data on" - " [HTTP:404]", - "Skipping http://lxd/1.0/config/user.meta-data on" - " [HTTP:404]", - "Skipping http://lxd/1.0/config/user.user-data on" - " [HTTP:404]", - "[GET] [HTTP:200] http://lxd/1.0/config", - "[GET] [HTTP:200] http://lxd/1.0/config/user.custom1", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/user.network-config", - ], - ), - ( # Assert all CONFIG_KEY_ALIASES promoted to top-level keys - { - "http://lxd/1.0/meta-data": "local-hostname: md\n", - "http://lxd/1.0/config": - '["/1.0/config/user.custom1",' - ' "/1.0/config/user.meta-data",' - ' "/1.0/config/user.network-config",' - ' "/1.0/config/user.user-data",' - ' "/1.0/config/user.vendor-data"]', - "http://lxd/1.0/config/user.custom1": "custom1", - "http://lxd/1.0/config/user.meta-data": "meta-data", - "http://lxd/1.0/config/user.network-config": "net-config", - "http://lxd/1.0/config/user.user-data": "user-data", - "http://lxd/1.0/config/user.vendor-data": "vendor-data", - }, - { - "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION, - "config": { - "user.custom1": "custom1", # Not promoted - "user.meta-data": "meta-data", - "user.network-config": "net-config", - "user.user-data": "user-data", - "user.vendor-data": "vendor-data", - }, - "meta-data": "local-hostname: md\n", - "network-config": "net-config", - "user-data": "user-data", - "vendor-data": "vendor-data", - }, - [ - "[GET] [HTTP:200] http://lxd/1.0/meta-data", - "[GET] [HTTP:200] http://lxd/1.0/config", - "[GET] [HTTP:200] http://lxd/1.0/config/user.custom1", - "[GET] [HTTP:200] http://lxd/1.0/config/user.meta-data", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/user.network-config", - "[GET] [HTTP:200] http://lxd/1.0/config/user.user-data", - "[GET] [HTTP:200] http://lxd/1.0/config/user.vendor-data", - ], - ), - ( # Assert cloud-init.* config key values prefered over user.* - { - "http://lxd/1.0/meta-data": "local-hostname: md\n", - "http://lxd/1.0/config": - '["/1.0/config/user.meta-data",' - ' "/1.0/config/user.network-config",' - ' "/1.0/config/user.user-data",' - ' "/1.0/config/user.vendor-data",' - ' "/1.0/config/cloud-init.network-config",' - ' "/1.0/config/cloud-init.user-data",' - ' "/1.0/config/cloud-init.vendor-data"]', - "http://lxd/1.0/config/user.meta-data": "user.meta-data", - "http://lxd/1.0/config/user.network-config": - "user.network-config", - "http://lxd/1.0/config/user.user-data": "user.user-data", - "http://lxd/1.0/config/user.vendor-data": - "user.vendor-data", - "http://lxd/1.0/config/cloud-init.meta-data": - "cloud-init.meta-data", - "http://lxd/1.0/config/cloud-init.network-config": - "cloud-init.network-config", - "http://lxd/1.0/config/cloud-init.user-data": - "cloud-init.user-data", - "http://lxd/1.0/config/cloud-init.vendor-data": - "cloud-init.vendor-data", - }, - { - "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION, - "config": { - "user.meta-data": "user.meta-data", - "user.network-config": "user.network-config", - "user.user-data": "user.user-data", - "user.vendor-data": "user.vendor-data", - "cloud-init.network-config": - "cloud-init.network-config", - "cloud-init.user-data": "cloud-init.user-data", - "cloud-init.vendor-data": - "cloud-init.vendor-data", - }, - "meta-data": "local-hostname: md\n", - "network-config": "cloud-init.network-config", - "user-data": "cloud-init.user-data", - "vendor-data": "cloud-init.vendor-data", - }, - [ - "[GET] [HTTP:200] http://lxd/1.0/meta-data", - "[GET] [HTTP:200] http://lxd/1.0/config", - "[GET] [HTTP:200] http://lxd/1.0/config/user.meta-data", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/user.network-config", - "[GET] [HTTP:200] http://lxd/1.0/config/user.user-data", - "[GET] [HTTP:200] http://lxd/1.0/config/user.vendor-data", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/cloud-init.network-config", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/cloud-init.user-data", - "[GET] [HTTP:200]" - " http://lxd/1.0/config/cloud-init.vendor-data", - "Ignoring LXD config user.user-data in favor of" - " cloud-init.user-data value.", - "Ignoring LXD config user.network-config in favor of" - " cloud-init.network-config value.", - "Ignoring LXD config user.vendor-data in favor of" - " cloud-init.vendor-data value.", - ], - ), - ) - ) - @mock.patch.object(lxd.requests.Session, 'get') - def test_read_metadata_handles_unexpected_content_or_http_status( - self, session_get, url_responses, expected, logs, caplog - ): - """read_metadata handles valid and invalid content and status codes.""" - - def fake_get(url): - """Mock Response json, ok, status_code, text from url_responses.""" - m_resp = mock.MagicMock() - content = url_responses.get(url, '') - m_resp.json.side_effect = lambda: json.loads(content) - if content: - mock_ok = mock.PropertyMock(return_value=True) - mock_status_code = mock.PropertyMock(return_value=200) - else: - mock_ok = mock.PropertyMock(return_value=False) - mock_status_code = mock.PropertyMock(return_value=404) - type(m_resp).ok = mock_ok - type(m_resp).status_code = mock_status_code - mock_text = mock.PropertyMock(return_value=content) - type(m_resp).text = mock_text - return m_resp - - session_get.side_effect = fake_get - - if isinstance(expected, Exception): - with pytest.raises(type(expected), match=re.escape(str(expected))): - lxd.read_metadata() - else: - assert expected == lxd.read_metadata() - caplogs = caplog.text - for log in logs: - assert log in caplogs - -# vi: ts=4 expandtab diff --git a/cloudinit/sources/tests/test_oracle.py b/cloudinit/sources/tests/test_oracle.py deleted file mode 100644 index 5f608cbb..00000000 --- a/cloudinit/sources/tests/test_oracle.py +++ /dev/null @@ -1,797 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import base64 -import copy -import json -from contextlib import ExitStack -from unittest import mock - -import pytest - -from cloudinit.sources import DataSourceOracle as oracle -from cloudinit.sources import NetworkConfigSource -from cloudinit.sources.DataSourceOracle import OpcMetadata -from cloudinit.tests import helpers as test_helpers -from cloudinit.url_helper import UrlError - -DS_PATH = "cloudinit.sources.DataSourceOracle" - -# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Bare Metal Machine -# with a secondary VNIC attached (vnicId truncated for Python line length) -OPC_BM_SECONDARY_VNIC_RESPONSE = """\ -[ { - "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtyvcucqkhdqmgjszebxe4hrb!!TRUNCATED||", - "privateIp" : "10.0.0.8", - "vlanTag" : 0, - "macAddr" : "90:e2:ba:d4:f1:68", - "virtualRouterIp" : "10.0.0.1", - "subnetCidrBlock" : "10.0.0.0/24", - "nicIndex" : 0 -}, { - "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtfmkxjdy2sqidndiwrsg63zf!!TRUNCATED||", - "privateIp" : "10.0.4.5", - "vlanTag" : 1, - "macAddr" : "02:00:17:05:CF:51", - "virtualRouterIp" : "10.0.4.1", - "subnetCidrBlock" : "10.0.4.0/24", - "nicIndex" : 0 -} ]""" - -# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Virtual Machine -# with a secondary VNIC attached -OPC_VM_SECONDARY_VNIC_RESPONSE = """\ -[ { - "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtch72z5pd76cc2636qeqh7z_truncated", - "privateIp" : "10.0.0.230", - "vlanTag" : 1039, - "macAddr" : "02:00:17:05:D1:DB", - "virtualRouterIp" : "10.0.0.1", - "subnetCidrBlock" : "10.0.0.0/24" -}, { - "vnicId" : "ocid1.vnic.oc1.phx.abyhqljt4iew3gwmvrwrhhf3bp5drj_truncated", - "privateIp" : "10.0.0.231", - "vlanTag" : 1041, - "macAddr" : "00:00:17:02:2B:B1", - "virtualRouterIp" : "10.0.0.1", - "subnetCidrBlock" : "10.0.0.0/24" -} ]""" - - -# Fetched with `curl http://169.254.169.254/opc/v1/instance/` (and then -# truncated for line length) -OPC_V2_METADATA = """\ -{ - "availabilityDomain" : "qIZq:PHX-AD-1", - "faultDomain" : "FAULT-DOMAIN-2", - "compartmentId" : "ocid1.tenancy.oc1..aaaaaaaao7f7cccogqrg5emjxkxmTRUNCATED", - "displayName" : "instance-20200320-1400", - "hostname" : "instance-20200320-1400", - "id" : "ocid1.instance.oc1.phx.anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED", - "image" : "ocid1.image.oc1.phx.aaaaaaaagmkn4gdhvvx24kiahh2b2qchsicTRUNCATED", - "metadata" : { - "ssh_authorized_keys" : "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated", - "user_data" : "IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v" - }, - "region" : "phx", - "canonicalRegionName" : "us-phoenix-1", - "ociAdName" : "phx-ad-3", - "shape" : "VM.Standard2.1", - "state" : "Running", - "timeCreated" : 1584727285318, - "agentConfig" : { - "monitoringDisabled" : true, - "managementDisabled" : true - } -}""" - -# Just a small meaningless change to differentiate the two metadatas -OPC_V1_METADATA = OPC_V2_METADATA.replace("ocid1.instance", "ocid2.instance") - - -@pytest.fixture -def metadata_version(): - return 2 - - -@pytest.yield_fixture -def oracle_ds(request, fixture_utils, paths, metadata_version): - """ - Return an instantiated DataSourceOracle. - - This also performs the mocking required for the default test case: - * ``_read_system_uuid`` returns something, - * ``_is_platform_viable`` returns True, - * ``_is_iscsi_root`` returns True (the simpler code path), - * ``read_opc_metadata`` returns ``OPC_V1_METADATA`` - - (This uses the paths fixture for the required helpers.Paths object, and the - fixture_utils fixture for fetching markers.) - """ - sys_cfg = fixture_utils.closest_marker_first_arg_or( - request, "ds_sys_cfg", mock.MagicMock() - ) - metadata = OpcMetadata(metadata_version, json.loads(OPC_V2_METADATA), None) - with mock.patch(DS_PATH + "._read_system_uuid", return_value="someuuid"): - with mock.patch(DS_PATH + "._is_platform_viable", return_value=True): - with mock.patch(DS_PATH + "._is_iscsi_root", return_value=True): - with mock.patch( - DS_PATH + ".read_opc_metadata", - return_value=metadata, - ): - yield oracle.DataSourceOracle( - sys_cfg=sys_cfg, distro=mock.Mock(), paths=paths, - ) - - -class TestDataSourceOracle: - def test_platform_info(self, oracle_ds): - assert "oracle" == oracle_ds.cloud_name - assert "oracle" == oracle_ds.platform_type - - def test_subplatform_before_fetch(self, oracle_ds): - assert 'unknown' == oracle_ds.subplatform - - def test_platform_info_after_fetch(self, oracle_ds): - oracle_ds._get_data() - assert 'metadata (http://169.254.169.254/opc/v2/)' == \ - oracle_ds.subplatform - - @pytest.mark.parametrize('metadata_version', [1]) - def test_v1_platform_info_after_fetch(self, oracle_ds): - oracle_ds._get_data() - assert 'metadata (http://169.254.169.254/opc/v1/)' == \ - oracle_ds.subplatform - - def test_secondary_nics_disabled_by_default(self, oracle_ds): - assert not oracle_ds.ds_cfg["configure_secondary_nics"] - - @pytest.mark.ds_sys_cfg( - {"datasource": {"Oracle": {"configure_secondary_nics": True}}} - ) - def test_sys_cfg_can_enable_configure_secondary_nics(self, oracle_ds): - assert oracle_ds.ds_cfg["configure_secondary_nics"] - - -class TestIsPlatformViable(test_helpers.CiTestCase): - @mock.patch(DS_PATH + ".dmi.read_dmi_data", - return_value=oracle.CHASSIS_ASSET_TAG) - def test_expected_viable(self, m_read_dmi_data): - """System with known chassis tag is viable.""" - self.assertTrue(oracle._is_platform_viable()) - m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')]) - - @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value=None) - def test_expected_not_viable_dmi_data_none(self, m_read_dmi_data): - """System without known chassis tag is not viable.""" - self.assertFalse(oracle._is_platform_viable()) - m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')]) - - @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value="LetsGoCubs") - def test_expected_not_viable_other(self, m_read_dmi_data): - """System with unnown chassis tag is not viable.""" - self.assertFalse(oracle._is_platform_viable()) - m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')]) - - -@mock.patch( - "cloudinit.net.is_openvswitch_internal_interface", - mock.Mock(return_value=False) -) -class TestNetworkConfigFromOpcImds: - def test_no_secondary_nics_does_not_mutate_input(self, oracle_ds): - oracle_ds._vnics_data = [{}] - # We test this by using in a non-dict to ensure that no dict - # operations are used; failure would be seen as exceptions - oracle_ds._network_config = object() - oracle_ds._add_network_config_from_opc_imds() - - def test_bare_metal_machine_skipped(self, oracle_ds, caplog): - # nicIndex in the first entry indicates a bare metal machine - oracle_ds._vnics_data = json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE) - # We test this by using a non-dict to ensure that no dict - # operations are used - oracle_ds._network_config = object() - oracle_ds._add_network_config_from_opc_imds() - assert 'bare metal machine' in caplog.text - - def test_missing_mac_skipped(self, oracle_ds, caplog): - oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE) - - oracle_ds._network_config = { - 'version': 1, 'config': [{'primary': 'nic'}] - } - with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}): - oracle_ds._add_network_config_from_opc_imds() - - assert 1 == len(oracle_ds.network_config['config']) - assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \ - caplog.text - - def test_missing_mac_skipped_v2(self, oracle_ds, caplog): - oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE) - - oracle_ds._network_config = { - 'version': 2, 'ethernets': {'primary': {'nic': {}}} - } - with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}): - oracle_ds._add_network_config_from_opc_imds() - - assert 1 == len(oracle_ds.network_config['ethernets']) - assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \ - caplog.text - - def test_secondary_nic(self, oracle_ds): - oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE) - oracle_ds._network_config = { - 'version': 1, 'config': [{'primary': 'nic'}] - } - mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3' - with mock.patch(DS_PATH + ".get_interfaces_by_mac", - return_value={mac_addr: nic_name}): - oracle_ds._add_network_config_from_opc_imds() - - # The input is mutated - assert 2 == len(oracle_ds.network_config['config']) - - secondary_nic_cfg = oracle_ds.network_config['config'][1] - assert nic_name == secondary_nic_cfg['name'] - assert 'physical' == secondary_nic_cfg['type'] - assert mac_addr == secondary_nic_cfg['mac_address'] - assert 9000 == secondary_nic_cfg['mtu'] - - assert 1 == len(secondary_nic_cfg['subnets']) - subnet_cfg = secondary_nic_cfg['subnets'][0] - # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE - assert '10.0.0.231' == subnet_cfg['address'] - - def test_secondary_nic_v2(self, oracle_ds): - oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE) - oracle_ds._network_config = { - 'version': 2, 'ethernets': {'primary': {'nic': {}}} - } - mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3' - with mock.patch(DS_PATH + ".get_interfaces_by_mac", - return_value={mac_addr: nic_name}): - oracle_ds._add_network_config_from_opc_imds() - - # The input is mutated - assert 2 == len(oracle_ds.network_config['ethernets']) - - secondary_nic_cfg = oracle_ds.network_config['ethernets']['ens3'] - assert secondary_nic_cfg['dhcp4'] is False - assert secondary_nic_cfg['dhcp6'] is False - assert mac_addr == secondary_nic_cfg['match']['macaddress'] - assert 9000 == secondary_nic_cfg['mtu'] - - assert 1 == len(secondary_nic_cfg['addresses']) - # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE - assert '10.0.0.231' == secondary_nic_cfg['addresses'][0] - - -class TestNetworkConfigFiltersNetFailover(test_helpers.CiTestCase): - - def setUp(self): - super(TestNetworkConfigFiltersNetFailover, self).setUp() - self.add_patch(DS_PATH + '.get_interfaces_by_mac', - 'm_get_interfaces_by_mac') - self.add_patch(DS_PATH + '.is_netfail_master', 'm_netfail_master') - - def test_ignore_bogus_network_config(self): - netcfg = {'something': 'here'} - passed_netcfg = copy.copy(netcfg) - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - - def test_ignore_network_config_unknown_versions(self): - netcfg = {'something': 'here', 'version': 3} - passed_netcfg = copy.copy(netcfg) - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - - def test_checks_v1_type_physical_interfaces(self): - mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3' - self.m_get_interfaces_by_mac.return_value = { - mac_addr: nic_name, - } - netcfg = {'version': 1, 'config': [ - {'type': 'physical', 'name': nic_name, 'mac_address': mac_addr, - 'subnets': [{'type': 'dhcp4'}]}]} - passed_netcfg = copy.copy(netcfg) - self.m_netfail_master.return_value = False - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - self.assertEqual([mock.call(nic_name)], - self.m_netfail_master.call_args_list) - - def test_checks_v1_skips_non_phys_interfaces(self): - mac_addr, nic_name = '00:00:17:02:2b:b1', 'bond0' - self.m_get_interfaces_by_mac.return_value = { - mac_addr: nic_name, - } - netcfg = {'version': 1, 'config': [ - {'type': 'bond', 'name': nic_name, 'mac_address': mac_addr, - 'subnets': [{'type': 'dhcp4'}]}]} - passed_netcfg = copy.copy(netcfg) - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - self.assertEqual(0, self.m_netfail_master.call_count) - - def test_removes_master_mac_property_v1(self): - nic_master, mac_master = 'ens3', self.random_string() - nic_other, mac_other = 'ens7', self.random_string() - nic_extra, mac_extra = 'enp0s1f2', self.random_string() - self.m_get_interfaces_by_mac.return_value = { - mac_master: nic_master, - mac_other: nic_other, - mac_extra: nic_extra, - } - netcfg = {'version': 1, 'config': [ - {'type': 'physical', 'name': nic_master, - 'mac_address': mac_master}, - {'type': 'physical', 'name': nic_other, 'mac_address': mac_other}, - {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra}, - ]} - - def _is_netfail_master(iface): - if iface == 'ens3': - return True - return False - self.m_netfail_master.side_effect = _is_netfail_master - expected_cfg = {'version': 1, 'config': [ - {'type': 'physical', 'name': nic_master}, - {'type': 'physical', 'name': nic_other, 'mac_address': mac_other}, - {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra}, - ]} - oracle._ensure_netfailover_safe(netcfg) - self.assertEqual(expected_cfg, netcfg) - - def test_checks_v2_type_ethernet_interfaces(self): - mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3' - self.m_get_interfaces_by_mac.return_value = { - mac_addr: nic_name, - } - netcfg = {'version': 2, 'ethernets': { - nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name, - 'match': {'macaddress': mac_addr}}}} - passed_netcfg = copy.copy(netcfg) - self.m_netfail_master.return_value = False - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - self.assertEqual([mock.call(nic_name)], - self.m_netfail_master.call_args_list) - - def test_skips_v2_non_ethernet_interfaces(self): - mac_addr, nic_name = '00:00:17:02:2b:b1', 'wlps0' - self.m_get_interfaces_by_mac.return_value = { - mac_addr: nic_name, - } - netcfg = {'version': 2, 'wifis': { - nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name, - 'match': {'macaddress': mac_addr}}}} - passed_netcfg = copy.copy(netcfg) - oracle._ensure_netfailover_safe(passed_netcfg) - self.assertEqual(netcfg, passed_netcfg) - self.assertEqual(0, self.m_netfail_master.call_count) - - def test_removes_master_mac_property_v2(self): - nic_master, mac_master = 'ens3', self.random_string() - nic_other, mac_other = 'ens7', self.random_string() - nic_extra, mac_extra = 'enp0s1f2', self.random_string() - self.m_get_interfaces_by_mac.return_value = { - mac_master: nic_master, - mac_other: nic_other, - mac_extra: nic_extra, - } - netcfg = {'version': 2, 'ethernets': { - nic_extra: {'dhcp4': True, 'set-name': nic_extra, - 'match': {'macaddress': mac_extra}}, - nic_other: {'dhcp4': True, 'set-name': nic_other, - 'match': {'macaddress': mac_other}}, - nic_master: {'dhcp4': True, 'set-name': nic_master, - 'match': {'macaddress': mac_master}}, - }} - - def _is_netfail_master(iface): - if iface == 'ens3': - return True - return False - self.m_netfail_master.side_effect = _is_netfail_master - - expected_cfg = {'version': 2, 'ethernets': { - nic_master: {'dhcp4': True, 'match': {'name': nic_master}}, - nic_extra: {'dhcp4': True, 'set-name': nic_extra, - 'match': {'macaddress': mac_extra}}, - nic_other: {'dhcp4': True, 'set-name': nic_other, - 'match': {'macaddress': mac_other}}, - }} - oracle._ensure_netfailover_safe(netcfg) - import pprint - pprint.pprint(netcfg) - print('---- ^^ modified ^^ ---- vv original vv ----') - pprint.pprint(expected_cfg) - self.assertEqual(expected_cfg, netcfg) - - -def _mock_v2_urls(httpretty): - def instance_callback(request, uri, response_headers): - print(response_headers) - assert request.headers.get("Authorization") == "Bearer Oracle" - return [200, response_headers, OPC_V2_METADATA] - - def vnics_callback(request, uri, response_headers): - assert request.headers.get("Authorization") == "Bearer Oracle" - return [200, response_headers, OPC_BM_SECONDARY_VNIC_RESPONSE] - - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v2/instance/", - body=instance_callback - ) - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v2/vnics/", - body=vnics_callback - ) - - -def _mock_no_v2_urls(httpretty): - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v2/instance/", - status=404, - ) - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v1/instance/", - body=OPC_V1_METADATA - ) - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v1/vnics/", - body=OPC_BM_SECONDARY_VNIC_RESPONSE - ) - - -class TestReadOpcMetadata: - # See https://docs.pytest.org/en/stable/example - # /parametrize.html#parametrizing-conditional-raising - does_not_raise = ExitStack - - @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None) - @pytest.mark.parametrize( - 'version,setup_urls,instance_data,fetch_vnics,vnics_data', [ - (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), True, - json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)), - (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), False, None), - (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), True, - json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)), - (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), False, None), - ] - ) - def test_metadata_returned( - self, version, setup_urls, instance_data, - fetch_vnics, vnics_data, httpretty - ): - setup_urls(httpretty) - metadata = oracle.read_opc_metadata(fetch_vnics_data=fetch_vnics) - - assert version == metadata.version - assert instance_data == metadata.instance_data - assert vnics_data == metadata.vnics_data - - # No need to actually wait between retries in the tests - @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None) - @pytest.mark.parametrize( - "v2_failure_count,v1_failure_count,expected_body,expectation", - [ - (1, 0, json.loads(OPC_V2_METADATA), does_not_raise()), - (2, 0, json.loads(OPC_V2_METADATA), does_not_raise()), - (3, 0, json.loads(OPC_V1_METADATA), does_not_raise()), - (3, 1, json.loads(OPC_V1_METADATA), does_not_raise()), - (3, 2, json.loads(OPC_V1_METADATA), does_not_raise()), - (3, 3, None, pytest.raises(UrlError)), - ] - ) - def test_retries(self, v2_failure_count, v1_failure_count, - expected_body, expectation, httpretty): - v2_responses = [httpretty.Response("", status=404)] * v2_failure_count - v2_responses.append(httpretty.Response(OPC_V2_METADATA)) - v1_responses = [httpretty.Response("", status=404)] * v1_failure_count - v1_responses.append(httpretty.Response(OPC_V1_METADATA)) - - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v1/instance/", - responses=v1_responses, - ) - httpretty.register_uri( - httpretty.GET, - "http://169.254.169.254/opc/v2/instance/", - responses=v2_responses, - ) - with expectation: - assert expected_body == oracle.read_opc_metadata().instance_data - - -class TestCommon_GetDataBehaviour: - """This test class tests behaviour common to iSCSI and non-iSCSI root. - - It defines a fixture, parameterized_oracle_ds, which is used in all the - tests herein to test that the commonly expected behaviour is the same with - iSCSI root and without. - - (As non-iSCSI root behaviour is a superset of iSCSI root behaviour this - class is implicitly also testing all iSCSI root behaviour so there is no - separate class for that case.) - """ - - @pytest.yield_fixture(params=[True, False]) - def parameterized_oracle_ds(self, request, oracle_ds): - """oracle_ds parameterized for iSCSI and non-iSCSI root respectively""" - is_iscsi_root = request.param - with ExitStack() as stack: - stack.enter_context( - mock.patch( - DS_PATH + "._is_iscsi_root", return_value=is_iscsi_root - ) - ) - if not is_iscsi_root: - stack.enter_context( - mock.patch(DS_PATH + ".net.find_fallback_nic") - ) - stack.enter_context( - mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4") - ) - yield oracle_ds - - @mock.patch( - DS_PATH + "._is_platform_viable", mock.Mock(return_value=False) - ) - def test_false_if_platform_not_viable( - self, parameterized_oracle_ds, - ): - assert not parameterized_oracle_ds._get_data() - - @pytest.mark.parametrize( - "keyname,expected_value", - ( - ("availability-zone", "phx-ad-3"), - ("launch-index", 0), - ("local-hostname", "instance-20200320-1400"), - ( - "instance-id", - "ocid1.instance.oc1.phx" - ".anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED", - ), - ("name", "instance-20200320-1400"), - ( - "public_keys", - "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated", - ), - ), - ) - def test_metadata_keys_set_correctly( - self, keyname, expected_value, parameterized_oracle_ds, - ): - assert parameterized_oracle_ds._get_data() - assert expected_value == parameterized_oracle_ds.metadata[keyname] - - @pytest.mark.parametrize( - "attribute_name,expected_value", - [ - ("_crawled_metadata", json.loads(OPC_V2_METADATA)), - ( - "userdata_raw", - base64.b64decode(b"IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"), - ), - ("system_uuid", "my-test-uuid"), - ], - ) - @mock.patch( - DS_PATH + "._read_system_uuid", mock.Mock(return_value="my-test-uuid") - ) - def test_attributes_set_correctly( - self, attribute_name, expected_value, parameterized_oracle_ds, - ): - assert parameterized_oracle_ds._get_data() - assert expected_value == getattr( - parameterized_oracle_ds, attribute_name - ) - - @pytest.mark.parametrize( - "ssh_keys,expected_value", - [ - # No SSH keys in metadata => no keys detected - (None, []), - # Empty SSH keys in metadata => no keys detected - ("", []), - # Single SSH key in metadata => single key detected - ("ssh-rsa ... test@test", ["ssh-rsa ... test@test"]), - # Multiple SSH keys in metadata => multiple keys detected - ( - "ssh-rsa ... test@test\nssh-rsa ... test2@test2", - ["ssh-rsa ... test@test", "ssh-rsa ... test2@test2"], - ), - ], - ) - def test_public_keys_handled_correctly( - self, ssh_keys, expected_value, parameterized_oracle_ds - ): - instance_data = json.loads(OPC_V1_METADATA) - if ssh_keys is None: - del instance_data["metadata"]["ssh_authorized_keys"] - else: - instance_data["metadata"]["ssh_authorized_keys"] = ssh_keys - metadata = OpcMetadata(None, instance_data, None) - with mock.patch( - DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata), - ): - assert parameterized_oracle_ds._get_data() - assert ( - expected_value == parameterized_oracle_ds.get_public_ssh_keys() - ) - - def test_missing_user_data_handled_gracefully( - self, parameterized_oracle_ds - ): - instance_data = json.loads(OPC_V1_METADATA) - del instance_data["metadata"]["user_data"] - metadata = OpcMetadata(None, instance_data, None) - with mock.patch( - DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata), - ): - assert parameterized_oracle_ds._get_data() - - assert parameterized_oracle_ds.userdata_raw is None - - def test_missing_metadata_handled_gracefully( - self, parameterized_oracle_ds - ): - instance_data = json.loads(OPC_V1_METADATA) - del instance_data["metadata"] - metadata = OpcMetadata(None, instance_data, None) - with mock.patch( - DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata), - ): - assert parameterized_oracle_ds._get_data() - - assert parameterized_oracle_ds.userdata_raw is None - assert [] == parameterized_oracle_ds.get_public_ssh_keys() - - -@mock.patch(DS_PATH + "._is_iscsi_root", lambda: False) -class TestNonIscsiRoot_GetDataBehaviour: - @mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4") - @mock.patch(DS_PATH + ".net.find_fallback_nic") - def test_read_opc_metadata_called_with_ephemeral_dhcp( - self, m_find_fallback_nic, m_EphemeralDHCPv4, oracle_ds - ): - in_context_manager = False - - def enter_context_manager(): - nonlocal in_context_manager - in_context_manager = True - - def exit_context_manager(*args): - nonlocal in_context_manager - in_context_manager = False - - m_EphemeralDHCPv4.return_value.__enter__.side_effect = ( - enter_context_manager - ) - m_EphemeralDHCPv4.return_value.__exit__.side_effect = ( - exit_context_manager - ) - - def assert_in_context_manager(**kwargs): - assert in_context_manager - return mock.MagicMock() - - with mock.patch( - DS_PATH + ".read_opc_metadata", - mock.Mock(side_effect=assert_in_context_manager), - ): - assert oracle_ds._get_data() - - assert [ - mock.call( - iface=m_find_fallback_nic.return_value, - connectivity_url_data={ - 'headers': { - 'Authorization': 'Bearer Oracle' - }, - 'url': 'http://169.254.169.254/opc/v2/instance/' - } - ) - ] == m_EphemeralDHCPv4.call_args_list - - -@mock.patch(DS_PATH + ".get_interfaces_by_mac", lambda: {}) -@mock.patch(DS_PATH + ".cmdline.read_initramfs_config") -class TestNetworkConfig: - def test_network_config_cached(self, m_read_initramfs_config, oracle_ds): - """.network_config should be cached""" - assert 0 == m_read_initramfs_config.call_count - oracle_ds.network_config # pylint: disable=pointless-statement - assert 1 == m_read_initramfs_config.call_count - oracle_ds.network_config # pylint: disable=pointless-statement - assert 1 == m_read_initramfs_config.call_count - - def test_network_cmdline(self, m_read_initramfs_config, oracle_ds): - """network_config should prefer initramfs config over fallback""" - ncfg = {"version": 1, "config": [{"a": "b"}]} - m_read_initramfs_config.return_value = copy.deepcopy(ncfg) - - assert ncfg == oracle_ds.network_config - assert 0 == oracle_ds.distro.generate_fallback_config.call_count - - def test_network_fallback(self, m_read_initramfs_config, oracle_ds): - """network_config should prefer initramfs config over fallback""" - ncfg = {"version": 1, "config": [{"a": "b"}]} - - m_read_initramfs_config.return_value = None - oracle_ds.distro.generate_fallback_config.return_value = copy.deepcopy( - ncfg - ) - - assert ncfg == oracle_ds.network_config - - @pytest.mark.parametrize( - "configure_secondary_nics,expect_secondary_nics", - [(True, True), (False, False), (None, False)], - ) - def test_secondary_nic_addition( - self, - m_read_initramfs_config, - configure_secondary_nics, - expect_secondary_nics, - oracle_ds, - ): - """Test that _add_network_config_from_opc_imds is called as expected - - (configure_secondary_nics=None is used to test the default behaviour.) - """ - m_read_initramfs_config.return_value = {"version": 1, "config": []} - - if configure_secondary_nics is not None: - oracle_ds.ds_cfg[ - "configure_secondary_nics" - ] = configure_secondary_nics - - def side_effect(self): - self._network_config["secondary_added"] = mock.sentinel.needle - - oracle_ds._vnics_data = 'DummyData' - with mock.patch.object( - oracle.DataSourceOracle, "_add_network_config_from_opc_imds", - new=side_effect, - ): - was_secondary_added = "secondary_added" in oracle_ds.network_config - assert expect_secondary_nics == was_secondary_added - - def test_secondary_nic_failure_isnt_blocking( - self, - m_read_initramfs_config, - caplog, - oracle_ds, - ): - oracle_ds.ds_cfg["configure_secondary_nics"] = True - oracle_ds._vnics_data = "DummyData" - - with mock.patch.object( - oracle.DataSourceOracle, "_add_network_config_from_opc_imds", - side_effect=Exception() - ): - network_config = oracle_ds.network_config - assert network_config == m_read_initramfs_config.return_value - assert "Failed to parse secondary network configuration" in caplog.text - - def test_ds_network_cfg_preferred_over_initramfs(self, _m): - """Ensure that DS net config is preferred over initramfs config""" - config_sources = oracle.DataSourceOracle.network_config_sources - ds_idx = config_sources.index(NetworkConfigSource.ds) - initramfs_idx = config_sources.index(NetworkConfigSource.initramfs) - assert ds_idx < initramfs_idx - - -# vi: ts=4 expandtab -- cgit v1.2.3