diff options
Diffstat (limited to 'tests/unittests/sources/test_nocloud.py')
-rw-r--r-- | tests/unittests/sources/test_nocloud.py | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_nocloud.py b/tests/unittests/sources/test_nocloud.py new file mode 100644 index 00000000..1f6b722d --- /dev/null +++ b/tests/unittests/sources/test_nocloud.py @@ -0,0 +1,453 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +import textwrap + +import yaml + +from cloudinit import dmi, helpers, util +from cloudinit.sources.DataSourceNoCloud import DataSourceNoCloud as dsNoCloud +from cloudinit.sources.DataSourceNoCloud import ( + _maybe_remove_top_network, + parse_cmdline_data, +) +from tests.unittests.helpers import CiTestCase, ExitStack, mock, populate_dir + + +@mock.patch("cloudinit.sources.DataSourceNoCloud.util.is_lxd") +class TestNoCloudDataSource(CiTestCase): + def setUp(self): + super(TestNoCloudDataSource, self).setUp() + self.tmp = self.tmp_dir() + self.paths = helpers.Paths( + {"cloud_dir": self.tmp, "run_dir": self.tmp} + ) + + self.cmdline = "root=TESTCMDLINE" + + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) + + self.mocks.enter_context( + mock.patch.object(util, "get_cmdline", return_value=self.cmdline) + ) + self.mocks.enter_context( + mock.patch.object(dmi, "read_dmi_data", return_value=None) + ) + + def _test_fs_config_is_read(self, fs_label, fs_label_to_search): + vfat_device = "device-1" + + def m_mount_cb(device, callback, mtype): + if device == vfat_device: + return {"meta-data": yaml.dump({"instance-id": "IID"})} + else: + return {} + + def m_find_devs_with(query="", path=""): + if "TYPE=vfat" == query: + return [vfat_device] + elif "LABEL={}".format(fs_label) == query: + return [vfat_device] + else: + return [] + + self.mocks.enter_context( + mock.patch.object( + util, "find_devs_with", side_effect=m_find_devs_with + ) + ) + self.mocks.enter_context( + mock.patch.object(util, "mount_cb", side_effect=m_mount_cb) + ) + sys_cfg = {"datasource": {"NoCloud": {"fs_label": fs_label_to_search}}} + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + + self.assertEqual(dsrc.metadata.get("instance-id"), "IID") + self.assertTrue(ret) + + def test_nocloud_seed_dir_on_lxd(self, m_is_lxd): + md = {"instance-id": "IID", "dsmode": "local"} + ud = b"USER_DATA_HERE" + seed_dir = os.path.join(self.paths.seed_dir, "nocloud") + populate_dir( + seed_dir, {"user-data": ud, "meta-data": yaml.safe_dump(md)} + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, ud) + self.assertEqual(dsrc.metadata, md) + self.assertEqual(dsrc.platform_type, "lxd") + self.assertEqual(dsrc.subplatform, "seed-dir (%s)" % seed_dir) + self.assertTrue(ret) + + def test_nocloud_seed_dir_non_lxd_platform_is_nocloud(self, m_is_lxd): + """Non-lxd environments will list nocloud as the platform.""" + m_is_lxd.return_value = False + md = {"instance-id": "IID", "dsmode": "local"} + seed_dir = os.path.join(self.paths.seed_dir, "nocloud") + populate_dir( + seed_dir, {"user-data": "", "meta-data": yaml.safe_dump(md)} + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertTrue(dsrc.get_data()) + self.assertEqual(dsrc.platform_type, "nocloud") + self.assertEqual(dsrc.subplatform, "seed-dir (%s)" % seed_dir) + + def test_fs_label(self, m_is_lxd): + # find_devs_with should not be called ff fs_label is None + class PsuedoException(Exception): + pass + + self.mocks.enter_context( + mock.patch.object( + util, "find_devs_with", side_effect=PsuedoException + ) + ) + + # by default, NoCloud should search for filesystems by label + sys_cfg = {"datasource": {"NoCloud": {}}} + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertRaises(PsuedoException, dsrc.get_data) + + # but disabling searching should just end up with None found + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertFalse(ret) + + def test_fs_config_lowercase_label(self, m_is_lxd): + self._test_fs_config_is_read("cidata", "cidata") + + def test_fs_config_uppercase_label(self, m_is_lxd): + self._test_fs_config_is_read("CIDATA", "cidata") + + def test_fs_config_lowercase_label_search_uppercase(self, m_is_lxd): + self._test_fs_config_is_read("cidata", "CIDATA") + + def test_fs_config_uppercase_label_search_uppercase(self, m_is_lxd): + self._test_fs_config_is_read("CIDATA", "CIDATA") + + def test_no_datasource_expected(self, m_is_lxd): + # no source should be found if no cmdline, config, and fs_label=None + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertFalse(dsrc.get_data()) + + def test_seed_in_config(self, m_is_lxd): + data = { + "fs_label": None, + "meta-data": yaml.safe_dump({"instance-id": "IID"}), + "user-data": b"USER_DATA_RAW", + } + + sys_cfg = {"datasource": {"NoCloud": data}} + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW") + self.assertEqual(dsrc.metadata.get("instance-id"), "IID") + self.assertTrue(ret) + + def test_nocloud_seed_with_vendordata(self, m_is_lxd): + md = {"instance-id": "IID", "dsmode": "local"} + ud = b"USER_DATA_HERE" + vd = b"THIS IS MY VENDOR_DATA" + + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + { + "user-data": ud, + "meta-data": yaml.safe_dump(md), + "vendor-data": vd, + }, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, ud) + self.assertEqual(dsrc.metadata, md) + self.assertEqual(dsrc.vendordata_raw, vd) + self.assertTrue(ret) + + def test_nocloud_no_vendordata(self, m_is_lxd): + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + {"user-data": b"ud", "meta-data": "instance-id: IID\n"}, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, b"ud") + self.assertFalse(dsrc.vendordata) + self.assertTrue(ret) + + def test_metadata_network_interfaces(self, m_is_lxd): + gateway = "103.225.10.1" + md = { + "instance-id": "i-abcd", + "local-hostname": "hostname1", + "network-interfaces": textwrap.dedent( + """\ + auto eth0 + iface eth0 inet static + hwaddr 00:16:3e:70:e1:04 + address 103.225.10.12 + netmask 255.255.255.0 + gateway """ + + gateway + + """ + dns-servers 8.8.8.8""" + ), + } + + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + {"user-data": b"ud", "meta-data": yaml.dump(md) + "\n"}, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + # very simple check just for the strings above + self.assertIn(gateway, str(dsrc.network_config)) + + def test_metadata_network_config(self, m_is_lxd): + # network-config needs to get into network_config + netconf = { + "version": 1, + "config": [ + { + "type": "physical", + "name": "interface0", + "subnets": [{"type": "dhcp"}], + } + ], + } + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + { + "user-data": b"ud", + "meta-data": "instance-id: IID\n", + "network-config": yaml.dump(netconf) + "\n", + }, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(netconf, dsrc.network_config) + + def test_metadata_network_config_with_toplevel_network(self, m_is_lxd): + """network-config may have 'network' top level key.""" + netconf = {"config": "disabled"} + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + { + "user-data": b"ud", + "meta-data": "instance-id: IID\n", + "network-config": yaml.dump({"network": netconf}) + "\n", + }, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(netconf, dsrc.network_config) + + def test_metadata_network_config_over_interfaces(self, m_is_lxd): + # network-config should override meta-data/network-interfaces + gateway = "103.225.10.1" + md = { + "instance-id": "i-abcd", + "local-hostname": "hostname1", + "network-interfaces": textwrap.dedent( + """\ + auto eth0 + iface eth0 inet static + hwaddr 00:16:3e:70:e1:04 + address 103.225.10.12 + netmask 255.255.255.0 + gateway """ + + gateway + + """ + dns-servers 8.8.8.8""" + ), + } + + netconf = { + "version": 1, + "config": [ + { + "type": "physical", + "name": "interface0", + "subnets": [{"type": "dhcp"}], + } + ], + } + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + { + "user-data": b"ud", + "meta-data": yaml.dump(md) + "\n", + "network-config": yaml.dump(netconf) + "\n", + }, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(netconf, dsrc.network_config) + self.assertNotIn(gateway, str(dsrc.network_config)) + + @mock.patch("cloudinit.util.blkid") + def test_nocloud_get_devices_freebsd(self, m_is_lxd, fake_blkid): + populate_dir( + os.path.join(self.paths.seed_dir, "nocloud"), + {"user-data": b"ud", "meta-data": "instance-id: IID\n"}, + ) + + sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}} + + self.mocks.enter_context( + mock.patch.object(util, "is_FreeBSD", return_value=True) + ) + + def _mfind_devs_with_freebsd( + criteria=None, + oformat="device", + tag=None, + no_cache=False, + path=None, + ): + if not criteria: + return ["/dev/msdosfs/foo", "/dev/iso9660/foo"] + if criteria.startswith("LABEL="): + return ["/dev/msdosfs/foo", "/dev/iso9660/foo"] + elif criteria == "TYPE=vfat": + return ["/dev/msdosfs/foo"] + elif criteria == "TYPE=iso9660": + return ["/dev/iso9660/foo"] + return [] + + self.mocks.enter_context( + mock.patch.object( + util, + "find_devs_with_freebsd", + side_effect=_mfind_devs_with_freebsd, + ) + ) + + dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc._get_devices("foo") + self.assertEqual(["/dev/msdosfs/foo", "/dev/iso9660/foo"], ret) + fake_blkid.assert_not_called() + + +class TestParseCommandLineData(CiTestCase): + def test_parse_cmdline_data_valid(self): + ds_id = "ds=nocloud" + pairs = ( + ("root=/dev/sda1 %(ds_id)s", {}), + ("%(ds_id)s; root=/dev/foo", {}), + ("%(ds_id)s", {}), + ("%(ds_id)s;", {}), + ("%(ds_id)s;s=SEED", {"seedfrom": "SEED"}), + ( + "%(ds_id)s;seedfrom=SEED;local-hostname=xhost", + {"seedfrom": "SEED", "local-hostname": "xhost"}, + ), + ("%(ds_id)s;h=xhost", {"local-hostname": "xhost"}), + ( + "%(ds_id)s;h=xhost;i=IID", + {"local-hostname": "xhost", "instance-id": "IID"}, + ), + ) + + for (fmt, expected) in pairs: + fill = {} + cmdline = fmt % {"ds_id": ds_id} + ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline) + self.assertEqual(expected, fill) + self.assertTrue(ret) + + def test_parse_cmdline_data_none(self): + ds_id = "ds=foo" + cmdlines = ( + "root=/dev/sda1 ro", + "console=/dev/ttyS0 root=/dev/foo", + "", + "ds=foocloud", + "ds=foo-net", + "ds=nocloud;s=SEED", + ) + + for cmdline in cmdlines: + fill = {} + ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline) + self.assertEqual(fill, {}) + self.assertFalse(ret) + + +class TestMaybeRemoveToplevelNetwork(CiTestCase): + """test _maybe_remove_top_network function.""" + + basecfg = [ + { + "type": "physical", + "name": "interface0", + "subnets": [{"type": "dhcp"}], + } + ] + + def test_should_remove_safely(self): + mcfg = {"config": self.basecfg, "version": 1} + self.assertEqual(mcfg, _maybe_remove_top_network({"network": mcfg})) + + def test_no_remove_if_other_keys(self): + """should not shift if other keys at top level.""" + mcfg = { + "network": {"config": self.basecfg, "version": 1}, + "unknown_keyname": "keyval", + } + self.assertEqual(mcfg, _maybe_remove_top_network(mcfg)) + + def test_no_remove_if_non_dict(self): + """should not shift if not a dict.""" + mcfg = {"network": '"content here'} + self.assertEqual(mcfg, _maybe_remove_top_network(mcfg)) + + def test_no_remove_if_missing_config_or_version(self): + """should not shift unless network entry has config and version.""" + mcfg = {"network": {"config": self.basecfg}} + self.assertEqual(mcfg, _maybe_remove_top_network(mcfg)) + + mcfg = {"network": {"version": 1}} + self.assertEqual(mcfg, _maybe_remove_top_network(mcfg)) + + def test_remove_with_config_disabled(self): + """network/config=disabled should be shifted.""" + mcfg = {"config": "disabled"} + self.assertEqual(mcfg, _maybe_remove_top_network({"network": mcfg})) + + +# vi: ts=4 expandtab |