summaryrefslogtreecommitdiff
path: root/cloudinit/sources
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources')
-rw-r--r--cloudinit/sources/helpers/tests/test_netlink.py480
-rw-r--r--cloudinit/sources/helpers/tests/test_openstack.py49
-rw-r--r--cloudinit/sources/tests/__init__.py0
-rw-r--r--cloudinit/sources/tests/test_init.py771
-rw-r--r--cloudinit/sources/tests/test_lxd.py376
-rw-r--r--cloudinit/sources/tests/test_oracle.py797
6 files changed, 0 insertions, 2473 deletions
diff --git a/cloudinit/sources/helpers/tests/test_netlink.py b/cloudinit/sources/helpers/tests/test_netlink.py
deleted file mode 100644
index cafe3961..00000000
--- a/cloudinit/sources/helpers/tests/test_netlink.py
+++ /dev/null
@@ -1,480 +0,0 @@
-# Author: Tamilmani Manoharan <tamanoha@microsoft.com>
-#
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.tests.helpers import CiTestCase, mock
-import socket
-import struct
-import codecs
-from cloudinit.sources.helpers.netlink import (
- NetlinkCreateSocketError, create_bound_netlink_socket, read_netlink_socket,
- read_rta_oper_state, unpack_rta_attr, wait_for_media_disconnect_connect,
- wait_for_nic_attach_event, wait_for_nic_detach_event,
- OPER_DOWN, OPER_UP, OPER_DORMANT, OPER_LOWERLAYERDOWN, OPER_NOTPRESENT,
- OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_DELLINK,
- RTM_SETLINK, RTM_GETLINK, MAX_SIZE)
-
-
-def int_to_bytes(i):
- '''convert integer to binary: eg: 1 to \x01'''
- hex_value = '{0:x}'.format(i)
- hex_value = '0' * (len(hex_value) % 2) + hex_value
- return codecs.decode(hex_value, 'hex_codec')
-
-
-class TestCreateBoundNetlinkSocket(CiTestCase):
-
- @mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
- def test_socket_error_on_create(self, m_socket):
- '''create_bound_netlink_socket catches socket creation exception'''
-
- """NetlinkCreateSocketError is raised when socket creation errors."""
- m_socket.side_effect = socket.error("Fake socket failure")
- with self.assertRaises(NetlinkCreateSocketError) as ctx_mgr:
- create_bound_netlink_socket()
- self.assertEqual(
- 'Exception during netlink socket create: Fake socket failure',
- str(ctx_mgr.exception))
-
-
-class TestReadNetlinkSocket(CiTestCase):
-
- @mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
- @mock.patch('cloudinit.sources.helpers.netlink.select.select')
- def test_read_netlink_socket(self, m_select, m_socket):
- '''read_netlink_socket able to receive data'''
- data = 'netlinktest'
- m_select.return_value = [m_socket], None, None
- m_socket.recv.return_value = data
- recv_data = read_netlink_socket(m_socket, 2)
- m_select.assert_called_with([m_socket], [], [], 2)
- m_socket.recv.assert_called_with(MAX_SIZE)
- self.assertIsNotNone(recv_data)
- self.assertEqual(recv_data, data)
-
- @mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
- @mock.patch('cloudinit.sources.helpers.netlink.select.select')
- def test_netlink_read_timeout(self, m_select, m_socket):
- '''read_netlink_socket should timeout if nothing to read'''
- m_select.return_value = [], None, None
- data = read_netlink_socket(m_socket, 1)
- m_select.assert_called_with([m_socket], [], [], 1)
- self.assertEqual(m_socket.recv.call_count, 0)
- self.assertIsNone(data)
-
- def test_read_invalid_socket(self):
- '''read_netlink_socket raises assert error if socket is invalid'''
- socket = None
- with self.assertRaises(AssertionError) as context:
- read_netlink_socket(socket, 1)
- self.assertTrue('netlink socket is none' in str(context.exception))
-
-
-class TestParseNetlinkMessage(CiTestCase):
-
- def test_read_rta_oper_state(self):
- '''read_rta_oper_state could parse netlink message and extract data'''
- ifname = "eth0"
- bytes = ifname.encode("utf-8")
- buf = bytearray(48)
- struct.pack_into("HH4sHHc", buf, RTATTR_START_OFFSET, 8, 3, bytes, 5,
- 16, int_to_bytes(OPER_DOWN))
- interface_state = read_rta_oper_state(buf)
- self.assertEqual(interface_state.ifname, ifname)
- self.assertEqual(interface_state.operstate, OPER_DOWN)
-
- def test_read_none_data(self):
- '''read_rta_oper_state raises assert error if data is none'''
- data = None
- with self.assertRaises(AssertionError) as context:
- read_rta_oper_state(data)
- self.assertEqual('data is none', str(context.exception))
-
- def test_read_invalid_rta_operstate_none(self):
- '''read_rta_oper_state returns none if operstate is none'''
- ifname = "eth0"
- buf = bytearray(40)
- bytes = ifname.encode("utf-8")
- struct.pack_into("HH4s", buf, RTATTR_START_OFFSET, 8, 3, bytes)
- interface_state = read_rta_oper_state(buf)
- self.assertIsNone(interface_state)
-
- def test_read_invalid_rta_ifname_none(self):
- '''read_rta_oper_state returns none if ifname is none'''
- buf = bytearray(40)
- struct.pack_into("HHc", buf, RTATTR_START_OFFSET, 5, 16,
- int_to_bytes(OPER_DOWN))
- interface_state = read_rta_oper_state(buf)
- self.assertIsNone(interface_state)
-
- def test_read_invalid_data_len(self):
- '''raise assert error if data size is smaller than required size'''
- buf = bytearray(32)
- with self.assertRaises(AssertionError) as context:
- read_rta_oper_state(buf)
- self.assertTrue('length of data is smaller than RTATTR_START_OFFSET' in
- str(context.exception))
-
- def test_unpack_rta_attr_none_data(self):
- '''unpack_rta_attr raises assert error if data is none'''
- data = None
- with self.assertRaises(AssertionError) as context:
- unpack_rta_attr(data, RTATTR_START_OFFSET)
- self.assertTrue('data is none' in str(context.exception))
-
- def test_unpack_rta_attr_invalid_offset(self):
- '''unpack_rta_attr raises assert error if offset is invalid'''
- data = bytearray(48)
- with self.assertRaises(AssertionError) as context:
- unpack_rta_attr(data, "offset")
- self.assertTrue('offset is not integer' in str(context.exception))
- with self.assertRaises(AssertionError) as context:
- unpack_rta_attr(data, 31)
- self.assertTrue('rta offset is less than expected length' in
- str(context.exception))
-
-
-@mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
-@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket')
-class TestNicAttachDetach(CiTestCase):
- with_logs = True
-
- def _media_switch_data(self, ifname, msg_type, operstate):
- '''construct netlink data with specified fields'''
- if ifname and operstate is not None:
- data = bytearray(48)
- bytes = ifname.encode("utf-8")
- struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3,
- bytes, 5, 16, int_to_bytes(operstate))
- elif ifname:
- data = bytearray(40)
- bytes = ifname.encode("utf-8")
- struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes)
- elif operstate:
- data = bytearray(40)
- struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16,
- int_to_bytes(operstate))
- struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0)
- return data
-
- def test_nic_attached_oper_down(self, m_read_netlink_socket, m_socket):
- '''Test for a new nic attached'''
- ifname = "eth0"
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- m_read_netlink_socket.side_effect = [data_op_down]
- ifread = wait_for_nic_attach_event(m_socket, [])
- self.assertEqual(m_read_netlink_socket.call_count, 1)
- self.assertEqual(ifname, ifread)
-
- def test_nic_attached_oper_up(self, m_read_netlink_socket, m_socket):
- '''Test for a new nic attached'''
- ifname = "eth0"
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- m_read_netlink_socket.side_effect = [data_op_up]
- ifread = wait_for_nic_attach_event(m_socket, [])
- self.assertEqual(m_read_netlink_socket.call_count, 1)
- self.assertEqual(ifname, ifread)
-
- def test_nic_attach_ignore_existing(self, m_read_netlink_socket, m_socket):
- '''Test that we read only the interfaces we are interested in.'''
- data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN)
- data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN)
- m_read_netlink_socket.side_effect = [data_eth0, data_eth1]
- ifread = wait_for_nic_attach_event(m_socket, ["eth0"])
- self.assertEqual(m_read_netlink_socket.call_count, 2)
- self.assertEqual("eth1", ifread)
-
- def test_nic_attach_read_first(self, m_read_netlink_socket, m_socket):
- '''Test that we read only the interfaces we are interested in.'''
- data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN)
- data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN)
- m_read_netlink_socket.side_effect = [data_eth0, data_eth1]
- ifread = wait_for_nic_attach_event(m_socket, ["eth1"])
- self.assertEqual(m_read_netlink_socket.call_count, 1)
- self.assertEqual("eth0", ifread)
-
- def test_nic_detached(self, m_read_netlink_socket, m_socket):
- '''Test for an existing nic detached'''
- ifname = "eth0"
- data_op_down = self._media_switch_data(ifname, RTM_DELLINK, OPER_DOWN)
- m_read_netlink_socket.side_effect = [data_op_down]
- ifread = wait_for_nic_detach_event(m_socket)
- self.assertEqual(m_read_netlink_socket.call_count, 1)
- self.assertEqual(ifname, ifread)
-
-
-@mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
-@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket')
-class TestWaitForMediaDisconnectConnect(CiTestCase):
- with_logs = True
-
- def _media_switch_data(self, ifname, msg_type, operstate):
- '''construct netlink data with specified fields'''
- if ifname and operstate is not None:
- data = bytearray(48)
- bytes = ifname.encode("utf-8")
- struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3,
- bytes, 5, 16, int_to_bytes(operstate))
- elif ifname:
- data = bytearray(40)
- bytes = ifname.encode("utf-8")
- struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes)
- elif operstate:
- data = bytearray(40)
- struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16,
- int_to_bytes(operstate))
- struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0)
- return data
-
- def test_media_down_up_scenario(self, m_read_netlink_socket,
- m_socket):
- '''Test for media down up sequence for required interface name'''
- ifname = "eth0"
- # construct data for Oper State down
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- # construct data for Oper State up
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- m_read_netlink_socket.side_effect = [data_op_down, data_op_up]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 2)
-
- def test_wait_for_media_switch_diff_interface(self, m_read_netlink_socket,
- m_socket):
- '''wait_for_media_disconnect_connect ignores unexpected interfaces.
-
- The first two messages are for other interfaces and last two are for
- expected interface. So the function exit only after receiving last
- 2 messages and therefore the call count for m_read_netlink_socket
- has to be 4
- '''
- other_ifname = "eth1"
- expected_ifname = "eth0"
- data_op_down_eth1 = self._media_switch_data(
- other_ifname, RTM_NEWLINK, OPER_DOWN
- )
- data_op_up_eth1 = self._media_switch_data(
- other_ifname, RTM_NEWLINK, OPER_UP
- )
- data_op_down_eth0 = self._media_switch_data(
- expected_ifname, RTM_NEWLINK, OPER_DOWN
- )
- data_op_up_eth0 = self._media_switch_data(
- expected_ifname, RTM_NEWLINK, OPER_UP)
- m_read_netlink_socket.side_effect = [
- data_op_down_eth1,
- data_op_up_eth1,
- data_op_down_eth0,
- data_op_up_eth0
- ]
- wait_for_media_disconnect_connect(m_socket, expected_ifname)
- self.assertIn('Ignored netlink event on interface %s' % other_ifname,
- self.logs.getvalue())
- self.assertEqual(m_read_netlink_socket.call_count, 4)
-
- def test_invalid_msgtype_getlink(self, m_read_netlink_socket, m_socket):
- '''wait_for_media_disconnect_connect ignores GETLINK events.
-
- The first two messages are for oper down and up for RTM_GETLINK type
- which netlink module will ignore. The last 2 messages are RTM_NEWLINK
- with oper state down and up messages. Therefore the call count for
- m_read_netlink_socket has to be 4 ignoring first 2 messages
- of RTM_GETLINK
- '''
- ifname = "eth0"
- data_getlink_down = self._media_switch_data(
- ifname, RTM_GETLINK, OPER_DOWN
- )
- data_getlink_up = self._media_switch_data(
- ifname, RTM_GETLINK, OPER_UP
- )
- data_newlink_down = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_DOWN
- )
- data_newlink_up = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_UP
- )
- m_read_netlink_socket.side_effect = [
- data_getlink_down,
- data_getlink_up,
- data_newlink_down,
- data_newlink_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 4)
-
- def test_invalid_msgtype_setlink(self, m_read_netlink_socket, m_socket):
- '''wait_for_media_disconnect_connect ignores SETLINK events.
-
- The first two messages are for oper down and up for RTM_GETLINK type
- which it will ignore. 3rd and 4th messages are RTM_NEWLINK with down
- and up messages. This function should exit after 4th messages since it
- sees down->up scenario. So the call count for m_read_netlink_socket
- has to be 4 ignoring first 2 messages of RTM_GETLINK and
- last 2 messages of RTM_NEWLINK
- '''
- ifname = "eth0"
- data_setlink_down = self._media_switch_data(
- ifname, RTM_SETLINK, OPER_DOWN
- )
- data_setlink_up = self._media_switch_data(
- ifname, RTM_SETLINK, OPER_UP
- )
- data_newlink_down = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_DOWN
- )
- data_newlink_up = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_UP
- )
- m_read_netlink_socket.side_effect = [
- data_setlink_down,
- data_setlink_up,
- data_newlink_down,
- data_newlink_up,
- data_newlink_down,
- data_newlink_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 4)
-
- def test_netlink_invalid_switch_scenario(self, m_read_netlink_socket,
- m_socket):
- '''returns only if it receives UP event after a DOWN event'''
- ifname = "eth0"
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- data_op_dormant = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_DORMANT
- )
- data_op_notpresent = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_NOTPRESENT
- )
- data_op_lowerdown = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_LOWERLAYERDOWN
- )
- data_op_testing = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_TESTING
- )
- data_op_unknown = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_UNKNOWN
- )
- m_read_netlink_socket.side_effect = [
- data_op_up, data_op_up,
- data_op_dormant, data_op_up,
- data_op_notpresent, data_op_up,
- data_op_lowerdown, data_op_up,
- data_op_testing, data_op_up,
- data_op_unknown, data_op_up,
- data_op_down, data_op_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 14)
-
- def test_netlink_valid_inbetween_transitions(self, m_read_netlink_socket,
- m_socket):
- '''wait_for_media_disconnect_connect handles in between transitions'''
- ifname = "eth0"
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- data_op_dormant = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_DORMANT)
- data_op_unknown = self._media_switch_data(
- ifname, RTM_NEWLINK, OPER_UNKNOWN)
- m_read_netlink_socket.side_effect = [
- data_op_down, data_op_dormant,
- data_op_unknown, data_op_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 4)
-
- def test_netlink_invalid_operstate(self, m_read_netlink_socket, m_socket):
- '''wait_for_media_disconnect_connect should handle invalid operstates.
-
- The function should not fail and return even if it receives invalid
- operstates. It always should wait for down up sequence.
- '''
- ifname = "eth0"
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- data_op_invalid = self._media_switch_data(ifname, RTM_NEWLINK, 7)
- m_read_netlink_socket.side_effect = [
- data_op_invalid, data_op_up,
- data_op_down, data_op_invalid,
- data_op_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 5)
-
- def test_wait_invalid_socket(self, m_read_netlink_socket, m_socket):
- '''wait_for_media_disconnect_connect handle none netlink socket.'''
- socket = None
- ifname = "eth0"
- with self.assertRaises(AssertionError) as context:
- wait_for_media_disconnect_connect(socket, ifname)
- self.assertTrue('netlink socket is none' in str(context.exception))
-
- def test_wait_invalid_ifname(self, m_read_netlink_socket, m_socket):
- '''wait_for_media_disconnect_connect handle none interface name'''
- ifname = None
- with self.assertRaises(AssertionError) as context:
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertTrue('interface name is none' in str(context.exception))
- ifname = ""
- with self.assertRaises(AssertionError) as context:
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertTrue('interface name cannot be empty' in
- str(context.exception))
-
- def test_wait_invalid_rta_attr(self, m_read_netlink_socket, m_socket):
- ''' wait_for_media_disconnect_connect handles invalid rta data'''
- ifname = "eth0"
- data_invalid1 = self._media_switch_data(None, RTM_NEWLINK, OPER_DOWN)
- data_invalid2 = self._media_switch_data(ifname, RTM_NEWLINK, None)
- data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
- data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
- m_read_netlink_socket.side_effect = [
- data_invalid1, data_invalid2, data_op_down, data_op_up
- ]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 4)
-
- def test_read_multiple_netlink_msgs(self, m_read_netlink_socket, m_socket):
- '''Read multiple messages in single receive call'''
- ifname = "eth0"
- bytes = ifname.encode("utf-8")
- data = bytearray(96)
- struct.pack_into("=LHHLL", data, 0, 48, RTM_NEWLINK, 0, 0, 0)
- struct.pack_into(
- "HH4sHHc", data, RTATTR_START_OFFSET, 8, 3,
- bytes, 5, 16, int_to_bytes(OPER_DOWN)
- )
- struct.pack_into("=LHHLL", data, 48, 48, RTM_NEWLINK, 0, 0, 0)
- struct.pack_into(
- "HH4sHHc", data, 48 + RTATTR_START_OFFSET, 8,
- 3, bytes, 5, 16, int_to_bytes(OPER_UP)
- )
- m_read_netlink_socket.return_value = data
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 1)
-
- def test_read_partial_netlink_msgs(self, m_read_netlink_socket, m_socket):
- '''Read partial messages in receive call'''
- ifname = "eth0"
- bytes = ifname.encode("utf-8")
- data1 = bytearray(112)
- data2 = bytearray(32)
- struct.pack_into("=LHHLL", data1, 0, 48, RTM_NEWLINK, 0, 0, 0)
- struct.pack_into(
- "HH4sHHc", data1, RTATTR_START_OFFSET, 8, 3,
- bytes, 5, 16, int_to_bytes(OPER_DOWN)
- )
- struct.pack_into("=LHHLL", data1, 48, 48, RTM_NEWLINK, 0, 0, 0)
- struct.pack_into(
- "HH4sHHc", data1, 80, 8, 3, bytes, 5, 16, int_to_bytes(OPER_DOWN)
- )
- struct.pack_into("=LHHLL", data1, 96, 48, RTM_NEWLINK, 0, 0, 0)
- struct.pack_into(
- "HH4sHHc", data2, 16, 8, 3, bytes, 5, 16, int_to_bytes(OPER_UP)
- )
- m_read_netlink_socket.side_effect = [data1, data2]
- wait_for_media_disconnect_connect(m_socket, ifname)
- self.assertEqual(m_read_netlink_socket.call_count, 2)
diff --git a/cloudinit/sources/helpers/tests/test_openstack.py b/cloudinit/sources/helpers/tests/test_openstack.py
deleted file mode 100644
index 95fb9743..00000000
--- a/cloudinit/sources/helpers/tests/test_openstack.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-# ./cloudinit/sources/helpers/tests/test_openstack.py
-from unittest import mock
-
-from cloudinit.sources.helpers import openstack
-from cloudinit.tests import helpers as test_helpers
-
-
-@mock.patch(
- "cloudinit.net.is_openvswitch_internal_interface",
- mock.Mock(return_value=False)
-)
-class TestConvertNetJson(test_helpers.CiTestCase):
-
- def test_phy_types(self):
- """Verify the different known physical types are handled."""
- # network_data.json example from
- # https://docs.openstack.org/nova/latest/user/metadata.html
- mac0 = "fa:16:3e:9c:bf:3d"
- net_json = {
- "links": [
- {"ethernet_mac_address": mac0, "id": "tapcd9f6d46-4a",
- "mtu": None, "type": "bridge",
- "vif_id": "cd9f6d46-4a3a-43ab-a466-994af9db96fc"}
- ],
- "networks": [
- {"id": "network0", "link": "tapcd9f6d46-4a",
- "network_id": "99e88329-f20d-4741-9593-25bf07847b16",
- "type": "ipv4_dhcp"}
- ],
- "services": [{"address": "8.8.8.8", "type": "dns"}]
- }
- macs = {mac0: 'eth0'}
-
- expected = {
- 'version': 1,
- 'config': [
- {'mac_address': 'fa:16:3e:9c:bf:3d',
- 'mtu': None, 'name': 'eth0',
- 'subnets': [{'type': 'dhcp4'}],
- 'type': 'physical'},
- {'address': '8.8.8.8', 'type': 'nameserver'}]}
-
- for t in openstack.KNOWN_PHYSICAL_TYPES:
- net_json["links"][0]["type"] = t
- self.assertEqual(
- expected,
- openstack.convert_net_json(network_json=net_json,
- known_macs=macs))
diff --git a/cloudinit/sources/tests/__init__.py b/cloudinit/sources/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cloudinit/sources/tests/__init__.py
+++ /dev/null
diff --git a/cloudinit/sources/tests/test_init.py b/cloudinit/sources/tests/test_init.py
deleted file mode 100644
index ae09cb17..00000000
--- a/cloudinit/sources/tests/test_init.py
+++ /dev/null
@@ -1,771 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import copy
-import inspect
-import os
-import stat
-
-from cloudinit.event import EventScope, EventType
-from cloudinit.helpers import Paths
-from cloudinit import importer
-from cloudinit.sources import (
- EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE,
- METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource,
- canonical_cloud_id, redact_sensitive_keys)
-from cloudinit.tests.helpers import CiTestCase, mock
-from cloudinit.user_data import UserDataProcessor
-from cloudinit import util
-
-
-class DataSourceTestSubclassNet(DataSource):
-
- dsname = 'MyTestSubclass'
- url_max_wait = 55
-
- def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
- custom_userdata=None, get_data_retval=True):
- super(DataSourceTestSubclassNet, self).__init__(
- sys_cfg, distro, paths)
- self._custom_userdata = custom_userdata
- self._custom_metadata = custom_metadata
- self._get_data_retval = get_data_retval
-
- def _get_cloud_name(self):
- return 'SubclassCloudName'
-
- def _get_data(self):
- if self._custom_metadata:
- self.metadata = self._custom_metadata
- else:
- self.metadata = {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}
- if self._custom_userdata:
- self.userdata_raw = self._custom_userdata
- else:
- self.userdata_raw = 'userdata_raw'
- self.vendordata_raw = 'vendordata_raw'
- return self._get_data_retval
-
-
-class InvalidDataSourceTestSubclassNet(DataSource):
- pass
-
-
-class TestDataSource(CiTestCase):
-
- with_logs = True
- maxDiff = None
-
- def setUp(self):
- super(TestDataSource, self).setUp()
- self.sys_cfg = {'datasource': {'_undef': {'key1': False}}}
- self.distro = 'distrotest' # generally should be a Distro object
- self.paths = Paths({})
- self.datasource = DataSource(self.sys_cfg, self.distro, self.paths)
-
- def test_datasource_init(self):
- """DataSource initializes metadata attributes, ds_cfg and ud_proc."""
- self.assertEqual(self.paths, self.datasource.paths)
- self.assertEqual(self.sys_cfg, self.datasource.sys_cfg)
- self.assertEqual(self.distro, self.datasource.distro)
- self.assertIsNone(self.datasource.userdata)
- self.assertEqual({}, self.datasource.metadata)
- self.assertIsNone(self.datasource.userdata_raw)
- self.assertIsNone(self.datasource.vendordata)
- self.assertIsNone(self.datasource.vendordata_raw)
- self.assertEqual({'key1': False}, self.datasource.ds_cfg)
- self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor)
-
- def test_datasource_init_gets_ds_cfg_using_dsname(self):
- """Init uses DataSource.dsname for sourcing ds_cfg."""
- sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
- distro = 'distrotest' # generally should be a Distro object
- datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
- self.assertEqual({'key2': False}, datasource.ds_cfg)
-
- def test_str_is_classname(self):
- """The string representation of the datasource is the classname."""
- self.assertEqual('DataSource', str(self.datasource))
- self.assertEqual(
- 'DataSourceTestSubclassNet',
- str(DataSourceTestSubclassNet('', '', self.paths)))
-
- def test_datasource_get_url_params_defaults(self):
- """get_url_params default url config settings for the datasource."""
- params = self.datasource.get_url_params()
- self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait)
- self.assertEqual(params.timeout_seconds, self.datasource.url_timeout)
- self.assertEqual(params.num_retries, self.datasource.url_retries)
- self.assertEqual(params.sec_between_retries,
- self.datasource.url_sec_between_retries)
-
- def test_datasource_get_url_params_subclassed(self):
- """Subclasses can override get_url_params defaults."""
- sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
- distro = 'distrotest' # generally should be a Distro object
- datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
- expected = (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries, datasource.url_sec_between_retries)
- url_params = datasource.get_url_params()
- self.assertNotEqual(self.datasource.get_url_params(), url_params)
- self.assertEqual(expected, url_params)
-
- def test_datasource_get_url_params_ds_config_override(self):
- """Datasource configuration options can override url param defaults."""
- sys_cfg = {
- 'datasource': {
- 'MyTestSubclass': {
- 'max_wait': '1', 'timeout': '2',
- 'retries': '3', 'sec_between_retries': 4
- }}}
- datasource = DataSourceTestSubclassNet(
- sys_cfg, self.distro, self.paths)
- expected = (1, 2, 3, 4)
- url_params = datasource.get_url_params()
- self.assertNotEqual(
- (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries, datasource.url_sec_between_retries),
- url_params)
- self.assertEqual(expected, url_params)
-
- def test_datasource_get_url_params_is_zero_or_greater(self):
- """get_url_params ignores timeouts with a value below 0."""
- # Set an override that is below 0 which gets ignored.
- sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}}
- datasource = DataSource(sys_cfg, self.distro, self.paths)
- (_max_wait, timeout, _retries,
- _sec_between_retries) = datasource.get_url_params()
- self.assertEqual(0, timeout)
-
- def test_datasource_get_url_uses_defaults_on_errors(self):
- """On invalid system config values for url_params defaults are used."""
- # All invalid values should be logged
- sys_cfg = {'datasource': {
- '_undef': {
- 'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}}
- datasource = DataSource(sys_cfg, self.distro, self.paths)
- url_params = datasource.get_url_params()
- expected = (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries, datasource.url_sec_between_retries)
- self.assertEqual(expected, url_params)
- logs = self.logs.getvalue()
- expected_logs = [
- "Config max_wait 'nope' is not an int, using default '-1'",
- "Config timeout 'bug' is not an int, using default '10'",
- "Config retries 'nonint' is not an int, using default '5'",
- ]
- for log in expected_logs:
- self.assertIn(log, logs)
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_fallback_interface_is_discovered(self, m_get_fallback_nic):
- """The fallback_interface is discovered via find_fallback_nic."""
- m_get_fallback_nic.return_value = 'nic9'
- self.assertEqual('nic9', self.datasource.fallback_interface)
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic):
- """Log a warning when fallback_interface can not discover the nic."""
- self.datasource._cloud_name = 'MySupahCloud'
- m_get_fallback_nic.return_value = None # Couldn't discover nic
- self.assertIsNone(self.datasource.fallback_interface)
- self.assertEqual(
- 'WARNING: Did not find a fallback interface on MySupahCloud.\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic):
- """The fallback_interface is cached and won't be rediscovered."""
- self.datasource._fallback_interface = 'nic10'
- self.assertEqual('nic10', self.datasource.fallback_interface)
- m_get_fallback_nic.assert_not_called()
-
- def test__get_data_unimplemented(self):
- """Raise an error when _get_data is not implemented."""
- with self.assertRaises(NotImplementedError) as context_manager:
- self.datasource.get_data()
- self.assertIn(
- 'Subclasses of DataSource must implement _get_data',
- str(context_manager.exception))
- datasource2 = InvalidDataSourceTestSubclassNet(
- self.sys_cfg, self.distro, self.paths)
- with self.assertRaises(NotImplementedError) as context_manager:
- datasource2.get_data()
- self.assertIn(
- 'Subclasses of DataSource must implement _get_data',
- str(context_manager.exception))
-
- def test_get_data_calls_subclass__get_data(self):
- """Datasource.get_data uses the subclass' version of _get_data."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- self.assertEqual(
- {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'},
- datasource.metadata)
- self.assertEqual('userdata_raw', datasource.userdata_raw)
- self.assertEqual('vendordata_raw', datasource.vendordata_raw)
-
- def test_get_hostname_strips_local_hostname_without_domain(self):
- """Datasource.get_hostname strips metadata local-hostname of domain."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- self.assertEqual(
- 'test-subclass-hostname', datasource.metadata['local-hostname'])
- self.assertEqual('test-subclass-hostname', datasource.get_hostname())
- datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
- self.assertEqual('hostname', datasource.get_hostname())
-
- def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self):
- """Datasource.get_hostname with fqdn set gets qualified hostname."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
- self.assertEqual(
- 'hostname.my.domain.com', datasource.get_hostname(fqdn=True))
-
- def test_get_hostname_without_metadata_uses_system_hostname(self):
- """Datasource.gethostname runs util.get_hostname when no metadata."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- m_gethost.return_value = 'systemhostname.domain.com'
- m_fqdn.return_value = None # No maching fqdn in /etc/hosts
- self.assertEqual('systemhostname', datasource.get_hostname())
- self.assertEqual(
- 'systemhostname.domain.com',
- datasource.get_hostname(fqdn=True))
-
- def test_get_hostname_without_metadata_returns_none(self):
- """Datasource.gethostname returns None when metadata_only and no MD."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- self.assertIsNone(datasource.get_hostname(metadata_only=True))
- self.assertIsNone(
- datasource.get_hostname(fqdn=True, metadata_only=True))
- self.assertEqual([], m_gethost.call_args_list)
- self.assertEqual([], m_fqdn.call_args_list)
-
- def test_get_hostname_without_metadata_prefers_etc_hosts(self):
- """Datasource.gethostname prefers /etc/hosts to util.get_hostname."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- m_gethost.return_value = 'systemhostname.domain.com'
- m_fqdn.return_value = 'fqdnhostname.domain.com'
- self.assertEqual('fqdnhostname', datasource.get_hostname())
- self.assertEqual('fqdnhostname.domain.com',
- datasource.get_hostname(fqdn=True))
-
- def test_get_data_does_not_write_instance_data_on_failure(self):
- """get_data does not write INSTANCE_JSON_FILE on get_data False."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- get_data_retval=False)
- self.assertFalse(datasource.get_data())
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- self.assertFalse(
- os.path.exists(json_file), 'Found unexpected file %s' % json_file)
-
- def test_get_data_writes_json_instance_data_on_success(self):
- """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': REDACT_SENSITIVE_VALUE,
- 'sensitive_keys': ['merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'kernel_release': '5.4.0-24-generic',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'subplatform': 'unknown',
- 'variant': 'ubuntu'},
- 'ds': {
-
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}}}
- self.assertEqual(expected, util.load_json(content))
- file_stat = os.stat(json_file)
- self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
- self.assertEqual(expected, util.load_json(content))
-
- def test_get_data_writes_redacted_public_json_instance_data(self):
- """get_data writes redacted content to public INSTANCE_JSON_FILE."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': {
- 'cred1': 'sekret', 'cred2': 'othersekret'}}})
- self.assertCountEqual(
- ('merged_cfg', 'security-credentials',),
- datasource.sensitive_metadata_keys)
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- redacted = util.load_json(util.load_file(json_file))
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': REDACT_SENSITIVE_VALUE,
- 'sensitive_keys': [
- 'ds/meta_data/some/security-credentials', 'merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'kernel_release': '5.4.0-24-generic',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'subplatform': 'unknown',
- 'variant': 'ubuntu'},
- 'ds': {
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
- }
- self.assertCountEqual(expected, redacted)
- file_stat = os.stat(json_file)
- self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
-
- def test_get_data_writes_json_instance_data_sensitive(self):
- """
- get_data writes unmodified data to sensitive file as root-readonly.
- """
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': {
- 'cred1': 'sekret', 'cred2': 'othersekret'}}})
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
-
- self.assertCountEqual(
- ('merged_cfg', 'security-credentials',),
- datasource.sensitive_metadata_keys)
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
- content = util.load_file(sensitive_json_file)
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': {
- '_doc': (
- 'Merged cloud-init system config from '
- '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/'
- ),
- 'datasource': {'_undef': {'key1': False}}},
- 'sensitive_keys': [
- 'ds/meta_data/some/security-credentials', 'merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'kernel_release': '5.4.0-24-generic',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'subplatform': 'unknown',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'variant': 'ubuntu'},
- 'ds': {
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {
- 'security-credentials':
- {'cred1': 'sekret', 'cred2': 'othersekret'}}}}
- }
- self.assertCountEqual(expected, util.load_json(content))
- file_stat = os.stat(sensitive_json_file)
- self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
- self.assertEqual(expected, util.load_json(content))
-
- def test_get_data_handles_redacted_unserializable_content(self):
- """get_data warns unserializable content in INSTANCE_JSON_FILE."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- expected_metadata = {
- 'key1': 'val1',
- 'key2': {
- 'key2.1': "Warning: redacted unserializable type <class"
- " 'cloudinit.helpers.Paths'>"}}
- instance_json = util.load_json(content)
- self.assertEqual(
- expected_metadata, instance_json['ds']['meta_data'])
-
- def test_persist_instance_data_writes_ec2_metadata_when_set(self):
- """When ec2_metadata class attribute is set, persist to json."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- datasource.ec2_metadata = UNSET
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- instance_data = util.load_json(util.load_file(json_file))
- self.assertNotIn('ec2_metadata', instance_data['ds'])
- datasource.ec2_metadata = {'ec2stuff': 'is good'}
- datasource.persist_instance_data()
- instance_data = util.load_json(util.load_file(json_file))
- self.assertEqual(
- {'ec2stuff': 'is good'},
- instance_data['ds']['ec2_metadata'])
-
- def test_persist_instance_data_writes_network_json_when_set(self):
- """When network_data.json class attribute is set, persist to json."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- instance_data = util.load_json(util.load_file(json_file))
- self.assertNotIn('network_json', instance_data['ds'])
- datasource.network_json = {'network_json': 'is good'}
- datasource.persist_instance_data()
- instance_data = util.load_json(util.load_file(json_file))
- self.assertEqual(
- {'network_json': 'is good'},
- instance_data['ds']['network_json'])
-
- def test_get_data_base64encodes_unserializable_bytes(self):
- """On py3, get_data base64encodes any unserializable content."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
- self.assertTrue(datasource.get_data())
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- instance_json = util.load_json(content)
- self.assertCountEqual(
- ['ds/meta_data/key2/key2.1'],
- instance_json['base64_encoded_keys'])
- self.assertEqual(
- {'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
- instance_json['ds']['meta_data'])
-
- def test_get_hostname_subclass_support(self):
- """Validate get_hostname signature on all subclasses of DataSource."""
- base_args = inspect.getfullargspec(DataSource.get_hostname)
- # Import all DataSource subclasses so we can inspect them.
- modules = util.find_modules(os.path.dirname(os.path.dirname(__file__)))
- for _loc, name in modules.items():
- mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], [])
- if mod_locs:
- importer.import_module(mod_locs[0])
- for child in DataSource.__subclasses__():
- if 'Test' in child.dsname:
- continue
- self.assertEqual(
- base_args,
- inspect.getfullargspec(child.get_hostname),
- '%s does not implement DataSource.get_hostname params'
- % child)
- for grandchild in child.__subclasses__():
- self.assertEqual(
- base_args,
- inspect.getfullargspec(grandchild.get_hostname),
- '%s does not implement DataSource.get_hostname params'
- % grandchild)
-
- def test_clear_cached_attrs_resets_cached_attr_class_attributes(self):
- """Class attributes listed in cached_attr_defaults are reset."""
- count = 0
- # Setup values for all cached class attributes
- for attr, value in self.datasource.cached_attr_defaults:
- setattr(self.datasource, attr, count)
- count += 1
- self.datasource._dirty_cache = True
- self.datasource.clear_cached_attrs()
- for attr, value in self.datasource.cached_attr_defaults:
- self.assertEqual(value, getattr(self.datasource, attr))
-
- def test_clear_cached_attrs_noops_on_clean_cache(self):
- """Class attributes listed in cached_attr_defaults are reset."""
- count = 0
- # Setup values for all cached class attributes
- for attr, _ in self.datasource.cached_attr_defaults:
- setattr(self.datasource, attr, count)
- count += 1
- self.datasource._dirty_cache = False # Fake clean cache
- self.datasource.clear_cached_attrs()
- count = 0
- for attr, _ in self.datasource.cached_attr_defaults:
- self.assertEqual(count, getattr(self.datasource, attr))
- count += 1
-
- def test_clear_cached_attrs_skips_non_attr_class_attributes(self):
- """Skip any cached_attr_defaults which aren't class attributes."""
- self.datasource._dirty_cache = True
- self.datasource.clear_cached_attrs()
- for attr in ('ec2_metadata', 'network_json'):
- self.assertFalse(hasattr(self.datasource, attr))
-
- def test_clear_cached_attrs_of_custom_attrs(self):
- """Custom attr_values can be passed to clear_cached_attrs."""
- self.datasource._dirty_cache = True
- cached_attr_name = self.datasource.cached_attr_defaults[0][0]
- setattr(self.datasource, cached_attr_name, 'himom')
- self.datasource.myattr = 'orig'
- self.datasource.clear_cached_attrs(
- attr_defaults=(('myattr', 'updated'),))
- self.assertEqual('himom', getattr(self.datasource, cached_attr_name))
- self.assertEqual('updated', self.datasource.myattr)
-
- @mock.patch.dict(DataSource.default_update_events, {
- EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
- @mock.patch.dict(DataSource.supported_update_events, {
- EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
- def test_update_metadata_only_acts_on_supported_update_events(self):
- """update_metadata_if_supported wont get_data on unsupported events."""
- self.assertEqual(
- {EventScope.NETWORK: set([EventType.BOOT_NEW_INSTANCE])},
- self.datasource.default_update_events
- )
-
- def fake_get_data():
- raise Exception('get_data should not be called')
-
- self.datasource.get_data = fake_get_data
- self.assertFalse(
- self.datasource.update_metadata_if_supported(
- source_event_types=[EventType.BOOT]))
-
- @mock.patch.dict(DataSource.supported_update_events, {
- EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
- def test_update_metadata_returns_true_on_supported_update_event(self):
- """update_metadata_if_supported returns get_data on supported events"""
- def fake_get_data():
- return True
-
- self.datasource.get_data = fake_get_data
- self.datasource._network_config = 'something'
- self.datasource._dirty_cache = True
- self.assertTrue(
- self.datasource.update_metadata_if_supported(
- source_event_types=[
- EventType.BOOT, EventType.BOOT_NEW_INSTANCE]))
- self.assertEqual(UNSET, self.datasource._network_config)
-
- self.assertIn(
- "DEBUG: Update datasource metadata and network config due to"
- " events: boot-new-instance",
- self.logs.getvalue()
- )
-
-
-class TestRedactSensitiveData(CiTestCase):
-
- def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
- """When sensitive_keys is absent or empty from metadata do nothing."""
- md = {'my': 'data'}
- self.assertEqual(
- md, redact_sensitive_keys(md, redact_value='redacted'))
- md['sensitive_keys'] = []
- self.assertEqual(
- md, redact_sensitive_keys(md, redact_value='redacted'))
-
- def test_redact_sensitive_data_redacts_exact_match_name(self):
- """Only exact matched sensitive_keys are redacted from metadata."""
- md = {'sensitive_keys': ['md/secure'],
- 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
- secure_md = copy.deepcopy(md)
- secure_md['md']['secure'] = 'redacted'
- self.assertEqual(
- secure_md,
- redact_sensitive_keys(md, redact_value='redacted'))
-
- def test_redact_sensitive_data_does_redacts_with_default_string(self):
- """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
- md = {'sensitive_keys': ['md/secure'],
- 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
- secure_md = copy.deepcopy(md)
- secure_md['md']['secure'] = 'redacted for non-root user'
- self.assertEqual(
- secure_md,
- redact_sensitive_keys(md))
-
-
-class TestCanonicalCloudID(CiTestCase):
-
- def test_cloud_id_returns_platform_on_unknowns(self):
- """When region and cloud_name are unknown, return platform."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
- region=METADATA_UNKNOWN,
- platform='platform'))
-
- def test_cloud_id_returns_platform_on_none(self):
- """When region and cloud_name are unknown, return platform."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=None,
- region=None,
- platform='platform'))
-
- def test_cloud_id_returns_cloud_name_on_unknown_region(self):
- """When region is unknown, return cloud_name."""
- for region in (None, METADATA_UNKNOWN):
- self.assertEqual(
- 'cloudname',
- canonical_cloud_id(cloud_name='cloudname',
- region=region,
- platform='platform'))
-
- def test_cloud_id_returns_platform_on_unknown_cloud_name(self):
- """When region is set but cloud_name is unknown return cloud_name."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
- region='region',
- platform='platform'))
-
- def test_cloud_id_aws_based_on_region_and_cloud_name(self):
- """When cloud_name is aws, return proper cloud-id based on region."""
- self.assertEqual(
- 'aws-china',
- canonical_cloud_id(cloud_name='aws',
- region='cn-north-1',
- platform='platform'))
- self.assertEqual(
- 'aws',
- canonical_cloud_id(cloud_name='aws',
- region='us-east-1',
- platform='platform'))
- self.assertEqual(
- 'aws-gov',
- canonical_cloud_id(cloud_name='aws',
- region='us-gov-1',
- platform='platform'))
- self.assertEqual( # Overrideen non-aws cloud_name is returned
- '!aws',
- canonical_cloud_id(cloud_name='!aws',
- region='us-gov-1',
- platform='platform'))
-
- def test_cloud_id_azure_based_on_region_and_cloud_name(self):
- """Report cloud-id when cloud_name is azure and region is in china."""
- self.assertEqual(
- 'azure-china',
- canonical_cloud_id(cloud_name='azure',
- region='chinaeast',
- platform='platform'))
- self.assertEqual(
- 'azure',
- canonical_cloud_id(cloud_name='azure',
- region='!chinaeast',
- platform='platform'))
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/sources/tests/test_lxd.py b/cloudinit/sources/tests/test_lxd.py
deleted file mode 100644
index a6e51f3b..00000000
--- a/cloudinit/sources/tests/test_lxd.py
+++ /dev/null
@@ -1,376 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from collections import namedtuple
-from copy import deepcopy
-import json
-import re
-import stat
-from unittest import mock
-import yaml
-
-import pytest
-
-from cloudinit.sources import (
- DataSourceLXD as lxd, InvalidMetaDataException, UNSET
-)
-DS_PATH = "cloudinit.sources.DataSourceLXD."
-
-
-LStatResponse = namedtuple("lstatresponse", "st_mode")
-
-
-NETWORK_V1 = {
- "version": 1,
- "config": [
- {
- "type": "physical", "name": "eth0",
- "subnets": [{"type": "dhcp", "control": "auto"}]
- }
- ]
-}
-
-
-def _add_network_v1_device(devname) -> dict:
- """Helper to inject device name into default network v1 config."""
- network_cfg = deepcopy(NETWORK_V1)
- network_cfg["config"][0]["name"] = devname
- return network_cfg
-
-
-LXD_V1_METADATA = {
- "meta-data": "instance-id: my-lxc\nlocal-hostname: my-lxc\n\n",
- "network-config": NETWORK_V1,
- "user-data": "#cloud-config\npackages: [sl]\n",
- "vendor-data": "#cloud-config\nruncmd: ['echo vendor-data']\n",
- "config": {
- "user.user-data":
- "instance-id: my-lxc\nlocal-hostname: my-lxc\n\n",
- "user.vendor-data":
- "#cloud-config\nruncmd: ['echo vendor-data']\n",
- "user.network-config": yaml.safe_dump(NETWORK_V1),
- }
-}
-
-
-@pytest.fixture
-def lxd_metadata():
- return LXD_V1_METADATA
-
-
-@pytest.yield_fixture
-def lxd_ds(request, paths, lxd_metadata):
- """
- Return an instantiated DataSourceLXD.
-
- This also performs the mocking required for the default test case:
- * ``is_platform_viable`` returns True,
- * ``read_metadata`` returns ``LXD_V1_METADATA``
-
- (This uses the paths fixture for the required helpers.Paths object)
- """
- with mock.patch(DS_PATH + "is_platform_viable", return_value=True):
- with mock.patch(DS_PATH + "read_metadata", return_value=lxd_metadata):
- yield lxd.DataSourceLXD(
- sys_cfg={}, distro=mock.Mock(), paths=paths
- )
-
-
-class TestGenerateFallbackNetworkConfig:
-
- @pytest.mark.parametrize(
- "uname_machine,systemd_detect_virt,expected", (
- # None for systemd_detect_virt returns None from which
- ({}, None, NETWORK_V1),
- ({}, None, NETWORK_V1),
- ("anything", "lxc\n", NETWORK_V1),
- # `uname -m` on kvm determines devname
- ("x86_64", "kvm\n", _add_network_v1_device("enp5s0")),
- ("ppc64le", "kvm\n", _add_network_v1_device("enp0s5")),
- ("s390x", "kvm\n", _add_network_v1_device("enc9"))
- )
- )
- @mock.patch(DS_PATH + "util.system_info")
- @mock.patch(DS_PATH + "subp.subp")
- @mock.patch(DS_PATH + "subp.which")
- def test_net_v2_based_on_network_mode_virt_type_and_uname_machine(
- self,
- m_which,
- m_subp,
- m_system_info,
- uname_machine,
- systemd_detect_virt,
- expected,
- ):
- """Return network config v2 based on uname -m, systemd-detect-virt."""
- if systemd_detect_virt is None:
- m_which.return_value = None
- m_system_info.return_value = {"uname": ["", "", "", "", uname_machine]}
- m_subp.return_value = (systemd_detect_virt, "")
- assert expected == lxd.generate_fallback_network_config()
- if systemd_detect_virt is None:
- assert 0 == m_subp.call_count
- assert 0 == m_system_info.call_count
- else:
- assert [
- mock.call(["systemd-detect-virt"])
- ] == m_subp.call_args_list
- if systemd_detect_virt != "kvm\n":
- assert 0 == m_system_info.call_count
- else:
- assert 1 == m_system_info.call_count
-
-
-class TestDataSourceLXD:
- def test_platform_info(self, lxd_ds):
- assert "LXD" == lxd_ds.dsname
- assert "lxd" == lxd_ds.cloud_name
- assert "lxd" == lxd_ds.platform_type
-
- def test_subplatform(self, lxd_ds):
- assert "LXD socket API v. 1.0 (/dev/lxd/sock)" == lxd_ds.subplatform
-
- def test__get_data(self, lxd_ds):
- """get_data calls read_metadata, setting appropiate instance attrs."""
- assert UNSET == lxd_ds._crawled_metadata
- assert UNSET == lxd_ds._network_config
- assert None is lxd_ds.userdata_raw
- assert True is lxd_ds._get_data()
- assert LXD_V1_METADATA == lxd_ds._crawled_metadata
- # network-config is dumped from YAML
- assert NETWORK_V1 == lxd_ds._network_config
- # Any user-data and vendor-data are saved as raw
- assert LXD_V1_METADATA["user-data"] == lxd_ds.userdata_raw
- assert LXD_V1_METADATA["vendor-data"] == lxd_ds.vendordata_raw
-
-
-class TestIsPlatformViable:
- @pytest.mark.parametrize(
- "exists,lstat_mode,expected", (
- (False, None, False),
- (True, stat.S_IFREG, False),
- (True, stat.S_IFSOCK, True),
- )
- )
- @mock.patch(DS_PATH + "os.lstat")
- @mock.patch(DS_PATH + "os.path.exists")
- def test_expected_viable(
- self, m_exists, m_lstat, exists, lstat_mode, expected
- ):
- """Return True only when LXD_SOCKET_PATH exists and is a socket."""
- m_exists.return_value = exists
- m_lstat.return_value = LStatResponse(lstat_mode)
- assert expected is lxd.is_platform_viable()
- m_exists.assert_has_calls([mock.call(lxd.LXD_SOCKET_PATH)])
- if exists:
- m_lstat.assert_has_calls([mock.call(lxd.LXD_SOCKET_PATH)])
- else:
- assert 0 == m_lstat.call_count
-
-
-class TestReadMetadata:
- @pytest.mark.parametrize(
- "url_responses,expected,logs", (
- ( # Assert non-JSON format from config route
- {
- "http://lxd/1.0/meta-data": "local-hostname: md\n",
- "http://lxd/1.0/config": "[NOT_JSON",
- },
- InvalidMetaDataException(
- "Unable to determine cloud-init config from"
- " http://lxd/1.0/config. Expected JSON but found:"
- " [NOT_JSON"),
- ["[GET] [HTTP:200] http://lxd/1.0/meta-data",
- "[GET] [HTTP:200] http://lxd/1.0/config"],
- ),
- ( # Assert success on just meta-data
- {
- "http://lxd/1.0/meta-data": "local-hostname: md\n",
- "http://lxd/1.0/config": "[]",
- },
- {
- "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION,
- "config": {}, "meta-data": "local-hostname: md\n"
- },
- ["[GET] [HTTP:200] http://lxd/1.0/meta-data",
- "[GET] [HTTP:200] http://lxd/1.0/config"],
- ),
- ( # Assert 404s for config routes log skipping
- {
- "http://lxd/1.0/meta-data": "local-hostname: md\n",
- "http://lxd/1.0/config":
- '["/1.0/config/user.custom1",'
- ' "/1.0/config/user.meta-data",'
- ' "/1.0/config/user.network-config",'
- ' "/1.0/config/user.user-data",'
- ' "/1.0/config/user.vendor-data"]',
- "http://lxd/1.0/config/user.custom1": "custom1",
- "http://lxd/1.0/config/user.meta-data": "", # 404
- "http://lxd/1.0/config/user.network-config": "net-config",
- "http://lxd/1.0/config/user.user-data": "", # 404
- "http://lxd/1.0/config/user.vendor-data": "", # 404
- },
- {
- "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION,
- "config": {
- "user.custom1": "custom1", # Not promoted
- "user.network-config": "net-config",
- },
- "meta-data": "local-hostname: md\n",
- "network-config": "net-config",
- },
- [
- "Skipping http://lxd/1.0/config/user.vendor-data on"
- " [HTTP:404]",
- "Skipping http://lxd/1.0/config/user.meta-data on"
- " [HTTP:404]",
- "Skipping http://lxd/1.0/config/user.user-data on"
- " [HTTP:404]",
- "[GET] [HTTP:200] http://lxd/1.0/config",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.custom1",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/user.network-config",
- ],
- ),
- ( # Assert all CONFIG_KEY_ALIASES promoted to top-level keys
- {
- "http://lxd/1.0/meta-data": "local-hostname: md\n",
- "http://lxd/1.0/config":
- '["/1.0/config/user.custom1",'
- ' "/1.0/config/user.meta-data",'
- ' "/1.0/config/user.network-config",'
- ' "/1.0/config/user.user-data",'
- ' "/1.0/config/user.vendor-data"]',
- "http://lxd/1.0/config/user.custom1": "custom1",
- "http://lxd/1.0/config/user.meta-data": "meta-data",
- "http://lxd/1.0/config/user.network-config": "net-config",
- "http://lxd/1.0/config/user.user-data": "user-data",
- "http://lxd/1.0/config/user.vendor-data": "vendor-data",
- },
- {
- "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION,
- "config": {
- "user.custom1": "custom1", # Not promoted
- "user.meta-data": "meta-data",
- "user.network-config": "net-config",
- "user.user-data": "user-data",
- "user.vendor-data": "vendor-data",
- },
- "meta-data": "local-hostname: md\n",
- "network-config": "net-config",
- "user-data": "user-data",
- "vendor-data": "vendor-data",
- },
- [
- "[GET] [HTTP:200] http://lxd/1.0/meta-data",
- "[GET] [HTTP:200] http://lxd/1.0/config",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.custom1",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.meta-data",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/user.network-config",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.user-data",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.vendor-data",
- ],
- ),
- ( # Assert cloud-init.* config key values prefered over user.*
- {
- "http://lxd/1.0/meta-data": "local-hostname: md\n",
- "http://lxd/1.0/config":
- '["/1.0/config/user.meta-data",'
- ' "/1.0/config/user.network-config",'
- ' "/1.0/config/user.user-data",'
- ' "/1.0/config/user.vendor-data",'
- ' "/1.0/config/cloud-init.network-config",'
- ' "/1.0/config/cloud-init.user-data",'
- ' "/1.0/config/cloud-init.vendor-data"]',
- "http://lxd/1.0/config/user.meta-data": "user.meta-data",
- "http://lxd/1.0/config/user.network-config":
- "user.network-config",
- "http://lxd/1.0/config/user.user-data": "user.user-data",
- "http://lxd/1.0/config/user.vendor-data":
- "user.vendor-data",
- "http://lxd/1.0/config/cloud-init.meta-data":
- "cloud-init.meta-data",
- "http://lxd/1.0/config/cloud-init.network-config":
- "cloud-init.network-config",
- "http://lxd/1.0/config/cloud-init.user-data":
- "cloud-init.user-data",
- "http://lxd/1.0/config/cloud-init.vendor-data":
- "cloud-init.vendor-data",
- },
- {
- "_metadata_api_version": lxd.LXD_SOCKET_API_VERSION,
- "config": {
- "user.meta-data": "user.meta-data",
- "user.network-config": "user.network-config",
- "user.user-data": "user.user-data",
- "user.vendor-data": "user.vendor-data",
- "cloud-init.network-config":
- "cloud-init.network-config",
- "cloud-init.user-data": "cloud-init.user-data",
- "cloud-init.vendor-data":
- "cloud-init.vendor-data",
- },
- "meta-data": "local-hostname: md\n",
- "network-config": "cloud-init.network-config",
- "user-data": "cloud-init.user-data",
- "vendor-data": "cloud-init.vendor-data",
- },
- [
- "[GET] [HTTP:200] http://lxd/1.0/meta-data",
- "[GET] [HTTP:200] http://lxd/1.0/config",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.meta-data",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/user.network-config",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.user-data",
- "[GET] [HTTP:200] http://lxd/1.0/config/user.vendor-data",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/cloud-init.network-config",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/cloud-init.user-data",
- "[GET] [HTTP:200]"
- " http://lxd/1.0/config/cloud-init.vendor-data",
- "Ignoring LXD config user.user-data in favor of"
- " cloud-init.user-data value.",
- "Ignoring LXD config user.network-config in favor of"
- " cloud-init.network-config value.",
- "Ignoring LXD config user.vendor-data in favor of"
- " cloud-init.vendor-data value.",
- ],
- ),
- )
- )
- @mock.patch.object(lxd.requests.Session, 'get')
- def test_read_metadata_handles_unexpected_content_or_http_status(
- self, session_get, url_responses, expected, logs, caplog
- ):
- """read_metadata handles valid and invalid content and status codes."""
-
- def fake_get(url):
- """Mock Response json, ok, status_code, text from url_responses."""
- m_resp = mock.MagicMock()
- content = url_responses.get(url, '')
- m_resp.json.side_effect = lambda: json.loads(content)
- if content:
- mock_ok = mock.PropertyMock(return_value=True)
- mock_status_code = mock.PropertyMock(return_value=200)
- else:
- mock_ok = mock.PropertyMock(return_value=False)
- mock_status_code = mock.PropertyMock(return_value=404)
- type(m_resp).ok = mock_ok
- type(m_resp).status_code = mock_status_code
- mock_text = mock.PropertyMock(return_value=content)
- type(m_resp).text = mock_text
- return m_resp
-
- session_get.side_effect = fake_get
-
- if isinstance(expected, Exception):
- with pytest.raises(type(expected), match=re.escape(str(expected))):
- lxd.read_metadata()
- else:
- assert expected == lxd.read_metadata()
- caplogs = caplog.text
- for log in logs:
- assert log in caplogs
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/sources/tests/test_oracle.py b/cloudinit/sources/tests/test_oracle.py
deleted file mode 100644
index 5f608cbb..00000000
--- a/cloudinit/sources/tests/test_oracle.py
+++ /dev/null
@@ -1,797 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import base64
-import copy
-import json
-from contextlib import ExitStack
-from unittest import mock
-
-import pytest
-
-from cloudinit.sources import DataSourceOracle as oracle
-from cloudinit.sources import NetworkConfigSource
-from cloudinit.sources.DataSourceOracle import OpcMetadata
-from cloudinit.tests import helpers as test_helpers
-from cloudinit.url_helper import UrlError
-
-DS_PATH = "cloudinit.sources.DataSourceOracle"
-
-# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Bare Metal Machine
-# with a secondary VNIC attached (vnicId truncated for Python line length)
-OPC_BM_SECONDARY_VNIC_RESPONSE = """\
-[ {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtyvcucqkhdqmgjszebxe4hrb!!TRUNCATED||",
- "privateIp" : "10.0.0.8",
- "vlanTag" : 0,
- "macAddr" : "90:e2:ba:d4:f1:68",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24",
- "nicIndex" : 0
-}, {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtfmkxjdy2sqidndiwrsg63zf!!TRUNCATED||",
- "privateIp" : "10.0.4.5",
- "vlanTag" : 1,
- "macAddr" : "02:00:17:05:CF:51",
- "virtualRouterIp" : "10.0.4.1",
- "subnetCidrBlock" : "10.0.4.0/24",
- "nicIndex" : 0
-} ]"""
-
-# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Virtual Machine
-# with a secondary VNIC attached
-OPC_VM_SECONDARY_VNIC_RESPONSE = """\
-[ {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtch72z5pd76cc2636qeqh7z_truncated",
- "privateIp" : "10.0.0.230",
- "vlanTag" : 1039,
- "macAddr" : "02:00:17:05:D1:DB",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24"
-}, {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljt4iew3gwmvrwrhhf3bp5drj_truncated",
- "privateIp" : "10.0.0.231",
- "vlanTag" : 1041,
- "macAddr" : "00:00:17:02:2B:B1",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24"
-} ]"""
-
-
-# Fetched with `curl http://169.254.169.254/opc/v1/instance/` (and then
-# truncated for line length)
-OPC_V2_METADATA = """\
-{
- "availabilityDomain" : "qIZq:PHX-AD-1",
- "faultDomain" : "FAULT-DOMAIN-2",
- "compartmentId" : "ocid1.tenancy.oc1..aaaaaaaao7f7cccogqrg5emjxkxmTRUNCATED",
- "displayName" : "instance-20200320-1400",
- "hostname" : "instance-20200320-1400",
- "id" : "ocid1.instance.oc1.phx.anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
- "image" : "ocid1.image.oc1.phx.aaaaaaaagmkn4gdhvvx24kiahh2b2qchsicTRUNCATED",
- "metadata" : {
- "ssh_authorized_keys" : "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
- "user_data" : "IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"
- },
- "region" : "phx",
- "canonicalRegionName" : "us-phoenix-1",
- "ociAdName" : "phx-ad-3",
- "shape" : "VM.Standard2.1",
- "state" : "Running",
- "timeCreated" : 1584727285318,
- "agentConfig" : {
- "monitoringDisabled" : true,
- "managementDisabled" : true
- }
-}"""
-
-# Just a small meaningless change to differentiate the two metadatas
-OPC_V1_METADATA = OPC_V2_METADATA.replace("ocid1.instance", "ocid2.instance")
-
-
-@pytest.fixture
-def metadata_version():
- return 2
-
-
-@pytest.yield_fixture
-def oracle_ds(request, fixture_utils, paths, metadata_version):
- """
- Return an instantiated DataSourceOracle.
-
- This also performs the mocking required for the default test case:
- * ``_read_system_uuid`` returns something,
- * ``_is_platform_viable`` returns True,
- * ``_is_iscsi_root`` returns True (the simpler code path),
- * ``read_opc_metadata`` returns ``OPC_V1_METADATA``
-
- (This uses the paths fixture for the required helpers.Paths object, and the
- fixture_utils fixture for fetching markers.)
- """
- sys_cfg = fixture_utils.closest_marker_first_arg_or(
- request, "ds_sys_cfg", mock.MagicMock()
- )
- metadata = OpcMetadata(metadata_version, json.loads(OPC_V2_METADATA), None)
- with mock.patch(DS_PATH + "._read_system_uuid", return_value="someuuid"):
- with mock.patch(DS_PATH + "._is_platform_viable", return_value=True):
- with mock.patch(DS_PATH + "._is_iscsi_root", return_value=True):
- with mock.patch(
- DS_PATH + ".read_opc_metadata",
- return_value=metadata,
- ):
- yield oracle.DataSourceOracle(
- sys_cfg=sys_cfg, distro=mock.Mock(), paths=paths,
- )
-
-
-class TestDataSourceOracle:
- def test_platform_info(self, oracle_ds):
- assert "oracle" == oracle_ds.cloud_name
- assert "oracle" == oracle_ds.platform_type
-
- def test_subplatform_before_fetch(self, oracle_ds):
- assert 'unknown' == oracle_ds.subplatform
-
- def test_platform_info_after_fetch(self, oracle_ds):
- oracle_ds._get_data()
- assert 'metadata (http://169.254.169.254/opc/v2/)' == \
- oracle_ds.subplatform
-
- @pytest.mark.parametrize('metadata_version', [1])
- def test_v1_platform_info_after_fetch(self, oracle_ds):
- oracle_ds._get_data()
- assert 'metadata (http://169.254.169.254/opc/v1/)' == \
- oracle_ds.subplatform
-
- def test_secondary_nics_disabled_by_default(self, oracle_ds):
- assert not oracle_ds.ds_cfg["configure_secondary_nics"]
-
- @pytest.mark.ds_sys_cfg(
- {"datasource": {"Oracle": {"configure_secondary_nics": True}}}
- )
- def test_sys_cfg_can_enable_configure_secondary_nics(self, oracle_ds):
- assert oracle_ds.ds_cfg["configure_secondary_nics"]
-
-
-class TestIsPlatformViable(test_helpers.CiTestCase):
- @mock.patch(DS_PATH + ".dmi.read_dmi_data",
- return_value=oracle.CHASSIS_ASSET_TAG)
- def test_expected_viable(self, m_read_dmi_data):
- """System with known chassis tag is viable."""
- self.assertTrue(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
- @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value=None)
- def test_expected_not_viable_dmi_data_none(self, m_read_dmi_data):
- """System without known chassis tag is not viable."""
- self.assertFalse(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
- @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value="LetsGoCubs")
- def test_expected_not_viable_other(self, m_read_dmi_data):
- """System with unnown chassis tag is not viable."""
- self.assertFalse(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
-
-@mock.patch(
- "cloudinit.net.is_openvswitch_internal_interface",
- mock.Mock(return_value=False)
-)
-class TestNetworkConfigFromOpcImds:
- def test_no_secondary_nics_does_not_mutate_input(self, oracle_ds):
- oracle_ds._vnics_data = [{}]
- # We test this by using in a non-dict to ensure that no dict
- # operations are used; failure would be seen as exceptions
- oracle_ds._network_config = object()
- oracle_ds._add_network_config_from_opc_imds()
-
- def test_bare_metal_machine_skipped(self, oracle_ds, caplog):
- # nicIndex in the first entry indicates a bare metal machine
- oracle_ds._vnics_data = json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)
- # We test this by using a non-dict to ensure that no dict
- # operations are used
- oracle_ds._network_config = object()
- oracle_ds._add_network_config_from_opc_imds()
- assert 'bare metal machine' in caplog.text
-
- def test_missing_mac_skipped(self, oracle_ds, caplog):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
-
- oracle_ds._network_config = {
- 'version': 1, 'config': [{'primary': 'nic'}]
- }
- with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}):
- oracle_ds._add_network_config_from_opc_imds()
-
- assert 1 == len(oracle_ds.network_config['config'])
- assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \
- caplog.text
-
- def test_missing_mac_skipped_v2(self, oracle_ds, caplog):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
-
- oracle_ds._network_config = {
- 'version': 2, 'ethernets': {'primary': {'nic': {}}}
- }
- with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}):
- oracle_ds._add_network_config_from_opc_imds()
-
- assert 1 == len(oracle_ds.network_config['ethernets'])
- assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \
- caplog.text
-
- def test_secondary_nic(self, oracle_ds):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
- oracle_ds._network_config = {
- 'version': 1, 'config': [{'primary': 'nic'}]
- }
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- with mock.patch(DS_PATH + ".get_interfaces_by_mac",
- return_value={mac_addr: nic_name}):
- oracle_ds._add_network_config_from_opc_imds()
-
- # The input is mutated
- assert 2 == len(oracle_ds.network_config['config'])
-
- secondary_nic_cfg = oracle_ds.network_config['config'][1]
- assert nic_name == secondary_nic_cfg['name']
- assert 'physical' == secondary_nic_cfg['type']
- assert mac_addr == secondary_nic_cfg['mac_address']
- assert 9000 == secondary_nic_cfg['mtu']
-
- assert 1 == len(secondary_nic_cfg['subnets'])
- subnet_cfg = secondary_nic_cfg['subnets'][0]
- # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE
- assert '10.0.0.231' == subnet_cfg['address']
-
- def test_secondary_nic_v2(self, oracle_ds):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
- oracle_ds._network_config = {
- 'version': 2, 'ethernets': {'primary': {'nic': {}}}
- }
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- with mock.patch(DS_PATH + ".get_interfaces_by_mac",
- return_value={mac_addr: nic_name}):
- oracle_ds._add_network_config_from_opc_imds()
-
- # The input is mutated
- assert 2 == len(oracle_ds.network_config['ethernets'])
-
- secondary_nic_cfg = oracle_ds.network_config['ethernets']['ens3']
- assert secondary_nic_cfg['dhcp4'] is False
- assert secondary_nic_cfg['dhcp6'] is False
- assert mac_addr == secondary_nic_cfg['match']['macaddress']
- assert 9000 == secondary_nic_cfg['mtu']
-
- assert 1 == len(secondary_nic_cfg['addresses'])
- # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE
- assert '10.0.0.231' == secondary_nic_cfg['addresses'][0]
-
-
-class TestNetworkConfigFiltersNetFailover(test_helpers.CiTestCase):
-
- def setUp(self):
- super(TestNetworkConfigFiltersNetFailover, self).setUp()
- self.add_patch(DS_PATH + '.get_interfaces_by_mac',
- 'm_get_interfaces_by_mac')
- self.add_patch(DS_PATH + '.is_netfail_master', 'm_netfail_master')
-
- def test_ignore_bogus_network_config(self):
- netcfg = {'something': 'here'}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
-
- def test_ignore_network_config_unknown_versions(self):
- netcfg = {'something': 'here', 'version': 3}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
-
- def test_checks_v1_type_physical_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_name, 'mac_address': mac_addr,
- 'subnets': [{'type': 'dhcp4'}]}]}
- passed_netcfg = copy.copy(netcfg)
- self.m_netfail_master.return_value = False
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual([mock.call(nic_name)],
- self.m_netfail_master.call_args_list)
-
- def test_checks_v1_skips_non_phys_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'bond0'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'bond', 'name': nic_name, 'mac_address': mac_addr,
- 'subnets': [{'type': 'dhcp4'}]}]}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual(0, self.m_netfail_master.call_count)
-
- def test_removes_master_mac_property_v1(self):
- nic_master, mac_master = 'ens3', self.random_string()
- nic_other, mac_other = 'ens7', self.random_string()
- nic_extra, mac_extra = 'enp0s1f2', self.random_string()
- self.m_get_interfaces_by_mac.return_value = {
- mac_master: nic_master,
- mac_other: nic_other,
- mac_extra: nic_extra,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_master,
- 'mac_address': mac_master},
- {'type': 'physical', 'name': nic_other, 'mac_address': mac_other},
- {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra},
- ]}
-
- def _is_netfail_master(iface):
- if iface == 'ens3':
- return True
- return False
- self.m_netfail_master.side_effect = _is_netfail_master
- expected_cfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_master},
- {'type': 'physical', 'name': nic_other, 'mac_address': mac_other},
- {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra},
- ]}
- oracle._ensure_netfailover_safe(netcfg)
- self.assertEqual(expected_cfg, netcfg)
-
- def test_checks_v2_type_ethernet_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 2, 'ethernets': {
- nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name,
- 'match': {'macaddress': mac_addr}}}}
- passed_netcfg = copy.copy(netcfg)
- self.m_netfail_master.return_value = False
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual([mock.call(nic_name)],
- self.m_netfail_master.call_args_list)
-
- def test_skips_v2_non_ethernet_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'wlps0'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 2, 'wifis': {
- nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name,
- 'match': {'macaddress': mac_addr}}}}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual(0, self.m_netfail_master.call_count)
-
- def test_removes_master_mac_property_v2(self):
- nic_master, mac_master = 'ens3', self.random_string()
- nic_other, mac_other = 'ens7', self.random_string()
- nic_extra, mac_extra = 'enp0s1f2', self.random_string()
- self.m_get_interfaces_by_mac.return_value = {
- mac_master: nic_master,
- mac_other: nic_other,
- mac_extra: nic_extra,
- }
- netcfg = {'version': 2, 'ethernets': {
- nic_extra: {'dhcp4': True, 'set-name': nic_extra,
- 'match': {'macaddress': mac_extra}},
- nic_other: {'dhcp4': True, 'set-name': nic_other,
- 'match': {'macaddress': mac_other}},
- nic_master: {'dhcp4': True, 'set-name': nic_master,
- 'match': {'macaddress': mac_master}},
- }}
-
- def _is_netfail_master(iface):
- if iface == 'ens3':
- return True
- return False
- self.m_netfail_master.side_effect = _is_netfail_master
-
- expected_cfg = {'version': 2, 'ethernets': {
- nic_master: {'dhcp4': True, 'match': {'name': nic_master}},
- nic_extra: {'dhcp4': True, 'set-name': nic_extra,
- 'match': {'macaddress': mac_extra}},
- nic_other: {'dhcp4': True, 'set-name': nic_other,
- 'match': {'macaddress': mac_other}},
- }}
- oracle._ensure_netfailover_safe(netcfg)
- import pprint
- pprint.pprint(netcfg)
- print('---- ^^ modified ^^ ---- vv original vv ----')
- pprint.pprint(expected_cfg)
- self.assertEqual(expected_cfg, netcfg)
-
-
-def _mock_v2_urls(httpretty):
- def instance_callback(request, uri, response_headers):
- print(response_headers)
- assert request.headers.get("Authorization") == "Bearer Oracle"
- return [200, response_headers, OPC_V2_METADATA]
-
- def vnics_callback(request, uri, response_headers):
- assert request.headers.get("Authorization") == "Bearer Oracle"
- return [200, response_headers, OPC_BM_SECONDARY_VNIC_RESPONSE]
-
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- body=instance_callback
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/vnics/",
- body=vnics_callback
- )
-
-
-def _mock_no_v2_urls(httpretty):
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- status=404,
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/instance/",
- body=OPC_V1_METADATA
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/vnics/",
- body=OPC_BM_SECONDARY_VNIC_RESPONSE
- )
-
-
-class TestReadOpcMetadata:
- # See https://docs.pytest.org/en/stable/example
- # /parametrize.html#parametrizing-conditional-raising
- does_not_raise = ExitStack
-
- @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None)
- @pytest.mark.parametrize(
- 'version,setup_urls,instance_data,fetch_vnics,vnics_data', [
- (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), True,
- json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)),
- (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), False, None),
- (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), True,
- json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)),
- (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), False, None),
- ]
- )
- def test_metadata_returned(
- self, version, setup_urls, instance_data,
- fetch_vnics, vnics_data, httpretty
- ):
- setup_urls(httpretty)
- metadata = oracle.read_opc_metadata(fetch_vnics_data=fetch_vnics)
-
- assert version == metadata.version
- assert instance_data == metadata.instance_data
- assert vnics_data == metadata.vnics_data
-
- # No need to actually wait between retries in the tests
- @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None)
- @pytest.mark.parametrize(
- "v2_failure_count,v1_failure_count,expected_body,expectation",
- [
- (1, 0, json.loads(OPC_V2_METADATA), does_not_raise()),
- (2, 0, json.loads(OPC_V2_METADATA), does_not_raise()),
- (3, 0, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 1, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 2, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 3, None, pytest.raises(UrlError)),
- ]
- )
- def test_retries(self, v2_failure_count, v1_failure_count,
- expected_body, expectation, httpretty):
- v2_responses = [httpretty.Response("", status=404)] * v2_failure_count
- v2_responses.append(httpretty.Response(OPC_V2_METADATA))
- v1_responses = [httpretty.Response("", status=404)] * v1_failure_count
- v1_responses.append(httpretty.Response(OPC_V1_METADATA))
-
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/instance/",
- responses=v1_responses,
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- responses=v2_responses,
- )
- with expectation:
- assert expected_body == oracle.read_opc_metadata().instance_data
-
-
-class TestCommon_GetDataBehaviour:
- """This test class tests behaviour common to iSCSI and non-iSCSI root.
-
- It defines a fixture, parameterized_oracle_ds, which is used in all the
- tests herein to test that the commonly expected behaviour is the same with
- iSCSI root and without.
-
- (As non-iSCSI root behaviour is a superset of iSCSI root behaviour this
- class is implicitly also testing all iSCSI root behaviour so there is no
- separate class for that case.)
- """
-
- @pytest.yield_fixture(params=[True, False])
- def parameterized_oracle_ds(self, request, oracle_ds):
- """oracle_ds parameterized for iSCSI and non-iSCSI root respectively"""
- is_iscsi_root = request.param
- with ExitStack() as stack:
- stack.enter_context(
- mock.patch(
- DS_PATH + "._is_iscsi_root", return_value=is_iscsi_root
- )
- )
- if not is_iscsi_root:
- stack.enter_context(
- mock.patch(DS_PATH + ".net.find_fallback_nic")
- )
- stack.enter_context(
- mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
- )
- yield oracle_ds
-
- @mock.patch(
- DS_PATH + "._is_platform_viable", mock.Mock(return_value=False)
- )
- def test_false_if_platform_not_viable(
- self, parameterized_oracle_ds,
- ):
- assert not parameterized_oracle_ds._get_data()
-
- @pytest.mark.parametrize(
- "keyname,expected_value",
- (
- ("availability-zone", "phx-ad-3"),
- ("launch-index", 0),
- ("local-hostname", "instance-20200320-1400"),
- (
- "instance-id",
- "ocid1.instance.oc1.phx"
- ".anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
- ),
- ("name", "instance-20200320-1400"),
- (
- "public_keys",
- "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
- ),
- ),
- )
- def test_metadata_keys_set_correctly(
- self, keyname, expected_value, parameterized_oracle_ds,
- ):
- assert parameterized_oracle_ds._get_data()
- assert expected_value == parameterized_oracle_ds.metadata[keyname]
-
- @pytest.mark.parametrize(
- "attribute_name,expected_value",
- [
- ("_crawled_metadata", json.loads(OPC_V2_METADATA)),
- (
- "userdata_raw",
- base64.b64decode(b"IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"),
- ),
- ("system_uuid", "my-test-uuid"),
- ],
- )
- @mock.patch(
- DS_PATH + "._read_system_uuid", mock.Mock(return_value="my-test-uuid")
- )
- def test_attributes_set_correctly(
- self, attribute_name, expected_value, parameterized_oracle_ds,
- ):
- assert parameterized_oracle_ds._get_data()
- assert expected_value == getattr(
- parameterized_oracle_ds, attribute_name
- )
-
- @pytest.mark.parametrize(
- "ssh_keys,expected_value",
- [
- # No SSH keys in metadata => no keys detected
- (None, []),
- # Empty SSH keys in metadata => no keys detected
- ("", []),
- # Single SSH key in metadata => single key detected
- ("ssh-rsa ... test@test", ["ssh-rsa ... test@test"]),
- # Multiple SSH keys in metadata => multiple keys detected
- (
- "ssh-rsa ... test@test\nssh-rsa ... test2@test2",
- ["ssh-rsa ... test@test", "ssh-rsa ... test2@test2"],
- ),
- ],
- )
- def test_public_keys_handled_correctly(
- self, ssh_keys, expected_value, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- if ssh_keys is None:
- del instance_data["metadata"]["ssh_authorized_keys"]
- else:
- instance_data["metadata"]["ssh_authorized_keys"] = ssh_keys
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
- assert (
- expected_value == parameterized_oracle_ds.get_public_ssh_keys()
- )
-
- def test_missing_user_data_handled_gracefully(
- self, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- del instance_data["metadata"]["user_data"]
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
-
- assert parameterized_oracle_ds.userdata_raw is None
-
- def test_missing_metadata_handled_gracefully(
- self, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- del instance_data["metadata"]
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
-
- assert parameterized_oracle_ds.userdata_raw is None
- assert [] == parameterized_oracle_ds.get_public_ssh_keys()
-
-
-@mock.patch(DS_PATH + "._is_iscsi_root", lambda: False)
-class TestNonIscsiRoot_GetDataBehaviour:
- @mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
- @mock.patch(DS_PATH + ".net.find_fallback_nic")
- def test_read_opc_metadata_called_with_ephemeral_dhcp(
- self, m_find_fallback_nic, m_EphemeralDHCPv4, oracle_ds
- ):
- in_context_manager = False
-
- def enter_context_manager():
- nonlocal in_context_manager
- in_context_manager = True
-
- def exit_context_manager(*args):
- nonlocal in_context_manager
- in_context_manager = False
-
- m_EphemeralDHCPv4.return_value.__enter__.side_effect = (
- enter_context_manager
- )
- m_EphemeralDHCPv4.return_value.__exit__.side_effect = (
- exit_context_manager
- )
-
- def assert_in_context_manager(**kwargs):
- assert in_context_manager
- return mock.MagicMock()
-
- with mock.patch(
- DS_PATH + ".read_opc_metadata",
- mock.Mock(side_effect=assert_in_context_manager),
- ):
- assert oracle_ds._get_data()
-
- assert [
- mock.call(
- iface=m_find_fallback_nic.return_value,
- connectivity_url_data={
- 'headers': {
- 'Authorization': 'Bearer Oracle'
- },
- 'url': 'http://169.254.169.254/opc/v2/instance/'
- }
- )
- ] == m_EphemeralDHCPv4.call_args_list
-
-
-@mock.patch(DS_PATH + ".get_interfaces_by_mac", lambda: {})
-@mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
-class TestNetworkConfig:
- def test_network_config_cached(self, m_read_initramfs_config, oracle_ds):
- """.network_config should be cached"""
- assert 0 == m_read_initramfs_config.call_count
- oracle_ds.network_config # pylint: disable=pointless-statement
- assert 1 == m_read_initramfs_config.call_count
- oracle_ds.network_config # pylint: disable=pointless-statement
- assert 1 == m_read_initramfs_config.call_count
-
- def test_network_cmdline(self, m_read_initramfs_config, oracle_ds):
- """network_config should prefer initramfs config over fallback"""
- ncfg = {"version": 1, "config": [{"a": "b"}]}
- m_read_initramfs_config.return_value = copy.deepcopy(ncfg)
-
- assert ncfg == oracle_ds.network_config
- assert 0 == oracle_ds.distro.generate_fallback_config.call_count
-
- def test_network_fallback(self, m_read_initramfs_config, oracle_ds):
- """network_config should prefer initramfs config over fallback"""
- ncfg = {"version": 1, "config": [{"a": "b"}]}
-
- m_read_initramfs_config.return_value = None
- oracle_ds.distro.generate_fallback_config.return_value = copy.deepcopy(
- ncfg
- )
-
- assert ncfg == oracle_ds.network_config
-
- @pytest.mark.parametrize(
- "configure_secondary_nics,expect_secondary_nics",
- [(True, True), (False, False), (None, False)],
- )
- def test_secondary_nic_addition(
- self,
- m_read_initramfs_config,
- configure_secondary_nics,
- expect_secondary_nics,
- oracle_ds,
- ):
- """Test that _add_network_config_from_opc_imds is called as expected
-
- (configure_secondary_nics=None is used to test the default behaviour.)
- """
- m_read_initramfs_config.return_value = {"version": 1, "config": []}
-
- if configure_secondary_nics is not None:
- oracle_ds.ds_cfg[
- "configure_secondary_nics"
- ] = configure_secondary_nics
-
- def side_effect(self):
- self._network_config["secondary_added"] = mock.sentinel.needle
-
- oracle_ds._vnics_data = 'DummyData'
- with mock.patch.object(
- oracle.DataSourceOracle, "_add_network_config_from_opc_imds",
- new=side_effect,
- ):
- was_secondary_added = "secondary_added" in oracle_ds.network_config
- assert expect_secondary_nics == was_secondary_added
-
- def test_secondary_nic_failure_isnt_blocking(
- self,
- m_read_initramfs_config,
- caplog,
- oracle_ds,
- ):
- oracle_ds.ds_cfg["configure_secondary_nics"] = True
- oracle_ds._vnics_data = "DummyData"
-
- with mock.patch.object(
- oracle.DataSourceOracle, "_add_network_config_from_opc_imds",
- side_effect=Exception()
- ):
- network_config = oracle_ds.network_config
- assert network_config == m_read_initramfs_config.return_value
- assert "Failed to parse secondary network configuration" in caplog.text
-
- def test_ds_network_cfg_preferred_over_initramfs(self, _m):
- """Ensure that DS net config is preferred over initramfs config"""
- config_sources = oracle.DataSourceOracle.network_config_sources
- ds_idx = config_sources.index(NetworkConfigSource.ds)
- initramfs_idx = config_sources.index(NetworkConfigSource.initramfs)
- assert ds_idx < initramfs_idx
-
-
-# vi: ts=4 expandtab