summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/test_nocloud.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/test_nocloud.py')
-rw-r--r--tests/unittests/sources/test_nocloud.py453
1 files changed, 453 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_nocloud.py b/tests/unittests/sources/test_nocloud.py
new file mode 100644
index 00000000..1f6b722d
--- /dev/null
+++ b/tests/unittests/sources/test_nocloud.py
@@ -0,0 +1,453 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+import textwrap
+
+import yaml
+
+from cloudinit import dmi, helpers, util
+from cloudinit.sources.DataSourceNoCloud import DataSourceNoCloud as dsNoCloud
+from cloudinit.sources.DataSourceNoCloud import (
+ _maybe_remove_top_network,
+ parse_cmdline_data,
+)
+from tests.unittests.helpers import CiTestCase, ExitStack, mock, populate_dir
+
+
+@mock.patch("cloudinit.sources.DataSourceNoCloud.util.is_lxd")
+class TestNoCloudDataSource(CiTestCase):
+ def setUp(self):
+ super(TestNoCloudDataSource, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {"cloud_dir": self.tmp, "run_dir": self.tmp}
+ )
+
+ self.cmdline = "root=TESTCMDLINE"
+
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
+
+ self.mocks.enter_context(
+ mock.patch.object(util, "get_cmdline", return_value=self.cmdline)
+ )
+ self.mocks.enter_context(
+ mock.patch.object(dmi, "read_dmi_data", return_value=None)
+ )
+
+ def _test_fs_config_is_read(self, fs_label, fs_label_to_search):
+ vfat_device = "device-1"
+
+ def m_mount_cb(device, callback, mtype):
+ if device == vfat_device:
+ return {"meta-data": yaml.dump({"instance-id": "IID"})}
+ else:
+ return {}
+
+ def m_find_devs_with(query="", path=""):
+ if "TYPE=vfat" == query:
+ return [vfat_device]
+ elif "LABEL={}".format(fs_label) == query:
+ return [vfat_device]
+ else:
+ return []
+
+ self.mocks.enter_context(
+ mock.patch.object(
+ util, "find_devs_with", side_effect=m_find_devs_with
+ )
+ )
+ self.mocks.enter_context(
+ mock.patch.object(util, "mount_cb", side_effect=m_mount_cb)
+ )
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": fs_label_to_search}}}
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+
+ self.assertEqual(dsrc.metadata.get("instance-id"), "IID")
+ self.assertTrue(ret)
+
+ def test_nocloud_seed_dir_on_lxd(self, m_is_lxd):
+ md = {"instance-id": "IID", "dsmode": "local"}
+ ud = b"USER_DATA_HERE"
+ seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
+ populate_dir(
+ seed_dir, {"user-data": ud, "meta-data": yaml.safe_dump(md)}
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, ud)
+ self.assertEqual(dsrc.metadata, md)
+ self.assertEqual(dsrc.platform_type, "lxd")
+ self.assertEqual(dsrc.subplatform, "seed-dir (%s)" % seed_dir)
+ self.assertTrue(ret)
+
+ def test_nocloud_seed_dir_non_lxd_platform_is_nocloud(self, m_is_lxd):
+ """Non-lxd environments will list nocloud as the platform."""
+ m_is_lxd.return_value = False
+ md = {"instance-id": "IID", "dsmode": "local"}
+ seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
+ populate_dir(
+ seed_dir, {"user-data": "", "meta-data": yaml.safe_dump(md)}
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertTrue(dsrc.get_data())
+ self.assertEqual(dsrc.platform_type, "nocloud")
+ self.assertEqual(dsrc.subplatform, "seed-dir (%s)" % seed_dir)
+
+ def test_fs_label(self, m_is_lxd):
+ # find_devs_with should not be called ff fs_label is None
+ class PsuedoException(Exception):
+ pass
+
+ self.mocks.enter_context(
+ mock.patch.object(
+ util, "find_devs_with", side_effect=PsuedoException
+ )
+ )
+
+ # by default, NoCloud should search for filesystems by label
+ sys_cfg = {"datasource": {"NoCloud": {}}}
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(PsuedoException, dsrc.get_data)
+
+ # but disabling searching should just end up with None found
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+
+ def test_fs_config_lowercase_label(self, m_is_lxd):
+ self._test_fs_config_is_read("cidata", "cidata")
+
+ def test_fs_config_uppercase_label(self, m_is_lxd):
+ self._test_fs_config_is_read("CIDATA", "cidata")
+
+ def test_fs_config_lowercase_label_search_uppercase(self, m_is_lxd):
+ self._test_fs_config_is_read("cidata", "CIDATA")
+
+ def test_fs_config_uppercase_label_search_uppercase(self, m_is_lxd):
+ self._test_fs_config_is_read("CIDATA", "CIDATA")
+
+ def test_no_datasource_expected(self, m_is_lxd):
+ # no source should be found if no cmdline, config, and fs_label=None
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertFalse(dsrc.get_data())
+
+ def test_seed_in_config(self, m_is_lxd):
+ data = {
+ "fs_label": None,
+ "meta-data": yaml.safe_dump({"instance-id": "IID"}),
+ "user-data": b"USER_DATA_RAW",
+ }
+
+ sys_cfg = {"datasource": {"NoCloud": data}}
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
+ self.assertEqual(dsrc.metadata.get("instance-id"), "IID")
+ self.assertTrue(ret)
+
+ def test_nocloud_seed_with_vendordata(self, m_is_lxd):
+ md = {"instance-id": "IID", "dsmode": "local"}
+ ud = b"USER_DATA_HERE"
+ vd = b"THIS IS MY VENDOR_DATA"
+
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {
+ "user-data": ud,
+ "meta-data": yaml.safe_dump(md),
+ "vendor-data": vd,
+ },
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, ud)
+ self.assertEqual(dsrc.metadata, md)
+ self.assertEqual(dsrc.vendordata_raw, vd)
+ self.assertTrue(ret)
+
+ def test_nocloud_no_vendordata(self, m_is_lxd):
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {"user-data": b"ud", "meta-data": "instance-id: IID\n"},
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, b"ud")
+ self.assertFalse(dsrc.vendordata)
+ self.assertTrue(ret)
+
+ def test_metadata_network_interfaces(self, m_is_lxd):
+ gateway = "103.225.10.1"
+ md = {
+ "instance-id": "i-abcd",
+ "local-hostname": "hostname1",
+ "network-interfaces": textwrap.dedent(
+ """\
+ auto eth0
+ iface eth0 inet static
+ hwaddr 00:16:3e:70:e1:04
+ address 103.225.10.12
+ netmask 255.255.255.0
+ gateway """
+ + gateway
+ + """
+ dns-servers 8.8.8.8"""
+ ),
+ }
+
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {"user-data": b"ud", "meta-data": yaml.dump(md) + "\n"},
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ # very simple check just for the strings above
+ self.assertIn(gateway, str(dsrc.network_config))
+
+ def test_metadata_network_config(self, m_is_lxd):
+ # network-config needs to get into network_config
+ netconf = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "interface0",
+ "subnets": [{"type": "dhcp"}],
+ }
+ ],
+ }
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {
+ "user-data": b"ud",
+ "meta-data": "instance-id: IID\n",
+ "network-config": yaml.dump(netconf) + "\n",
+ },
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
+
+ def test_metadata_network_config_with_toplevel_network(self, m_is_lxd):
+ """network-config may have 'network' top level key."""
+ netconf = {"config": "disabled"}
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {
+ "user-data": b"ud",
+ "meta-data": "instance-id: IID\n",
+ "network-config": yaml.dump({"network": netconf}) + "\n",
+ },
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
+
+ def test_metadata_network_config_over_interfaces(self, m_is_lxd):
+ # network-config should override meta-data/network-interfaces
+ gateway = "103.225.10.1"
+ md = {
+ "instance-id": "i-abcd",
+ "local-hostname": "hostname1",
+ "network-interfaces": textwrap.dedent(
+ """\
+ auto eth0
+ iface eth0 inet static
+ hwaddr 00:16:3e:70:e1:04
+ address 103.225.10.12
+ netmask 255.255.255.0
+ gateway """
+ + gateway
+ + """
+ dns-servers 8.8.8.8"""
+ ),
+ }
+
+ netconf = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "interface0",
+ "subnets": [{"type": "dhcp"}],
+ }
+ ],
+ }
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {
+ "user-data": b"ud",
+ "meta-data": yaml.dump(md) + "\n",
+ "network-config": yaml.dump(netconf) + "\n",
+ },
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
+ self.assertNotIn(gateway, str(dsrc.network_config))
+
+ @mock.patch("cloudinit.util.blkid")
+ def test_nocloud_get_devices_freebsd(self, m_is_lxd, fake_blkid):
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {"user-data": b"ud", "meta-data": "instance-id: IID\n"},
+ )
+
+ sys_cfg = {"datasource": {"NoCloud": {"fs_label": None}}}
+
+ self.mocks.enter_context(
+ mock.patch.object(util, "is_FreeBSD", return_value=True)
+ )
+
+ def _mfind_devs_with_freebsd(
+ criteria=None,
+ oformat="device",
+ tag=None,
+ no_cache=False,
+ path=None,
+ ):
+ if not criteria:
+ return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
+ if criteria.startswith("LABEL="):
+ return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
+ elif criteria == "TYPE=vfat":
+ return ["/dev/msdosfs/foo"]
+ elif criteria == "TYPE=iso9660":
+ return ["/dev/iso9660/foo"]
+ return []
+
+ self.mocks.enter_context(
+ mock.patch.object(
+ util,
+ "find_devs_with_freebsd",
+ side_effect=_mfind_devs_with_freebsd,
+ )
+ )
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc._get_devices("foo")
+ self.assertEqual(["/dev/msdosfs/foo", "/dev/iso9660/foo"], ret)
+ fake_blkid.assert_not_called()
+
+
+class TestParseCommandLineData(CiTestCase):
+ def test_parse_cmdline_data_valid(self):
+ ds_id = "ds=nocloud"
+ pairs = (
+ ("root=/dev/sda1 %(ds_id)s", {}),
+ ("%(ds_id)s; root=/dev/foo", {}),
+ ("%(ds_id)s", {}),
+ ("%(ds_id)s;", {}),
+ ("%(ds_id)s;s=SEED", {"seedfrom": "SEED"}),
+ (
+ "%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
+ {"seedfrom": "SEED", "local-hostname": "xhost"},
+ ),
+ ("%(ds_id)s;h=xhost", {"local-hostname": "xhost"}),
+ (
+ "%(ds_id)s;h=xhost;i=IID",
+ {"local-hostname": "xhost", "instance-id": "IID"},
+ ),
+ )
+
+ for (fmt, expected) in pairs:
+ fill = {}
+ cmdline = fmt % {"ds_id": ds_id}
+ ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
+ self.assertEqual(expected, fill)
+ self.assertTrue(ret)
+
+ def test_parse_cmdline_data_none(self):
+ ds_id = "ds=foo"
+ cmdlines = (
+ "root=/dev/sda1 ro",
+ "console=/dev/ttyS0 root=/dev/foo",
+ "",
+ "ds=foocloud",
+ "ds=foo-net",
+ "ds=nocloud;s=SEED",
+ )
+
+ for cmdline in cmdlines:
+ fill = {}
+ ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
+ self.assertEqual(fill, {})
+ self.assertFalse(ret)
+
+
+class TestMaybeRemoveToplevelNetwork(CiTestCase):
+ """test _maybe_remove_top_network function."""
+
+ basecfg = [
+ {
+ "type": "physical",
+ "name": "interface0",
+ "subnets": [{"type": "dhcp"}],
+ }
+ ]
+
+ def test_should_remove_safely(self):
+ mcfg = {"config": self.basecfg, "version": 1}
+ self.assertEqual(mcfg, _maybe_remove_top_network({"network": mcfg}))
+
+ def test_no_remove_if_other_keys(self):
+ """should not shift if other keys at top level."""
+ mcfg = {
+ "network": {"config": self.basecfg, "version": 1},
+ "unknown_keyname": "keyval",
+ }
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_no_remove_if_non_dict(self):
+ """should not shift if not a dict."""
+ mcfg = {"network": '"content here'}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_no_remove_if_missing_config_or_version(self):
+ """should not shift unless network entry has config and version."""
+ mcfg = {"network": {"config": self.basecfg}}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ mcfg = {"network": {"version": 1}}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_remove_with_config_disabled(self):
+ """network/config=disabled should be shifted."""
+ mcfg = {"config": "disabled"}
+ self.assertEqual(mcfg, _maybe_remove_top_network({"network": mcfg}))
+
+
+# vi: ts=4 expandtab