summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py97
-rw-r--r--tests/unittests/test_datasource/test_maas.py8
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py157
3 files changed, 209 insertions, 53 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index aa5b98ed..930086db 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -11,6 +11,7 @@ from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
+from tests.unittests import helpers as unit_helpers
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -89,23 +90,22 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- provided_name = dev_name[len('/dev/'):]
- provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -122,19 +122,18 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -156,17 +155,16 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
- my_mock.restore()
+ with unit_helpers.mocker(verify_calls=False) as my_mock:
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -259,19 +257,25 @@ class TestConfigDriveDataSource(MockerTestCase):
ds.read_config_drive_dir, my_d)
def test_find_candidates(self):
- devs_with_answers = {
- "TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ devs_with_answers = {}
def my_devs_with(criteria):
return devs_with_answers[criteria]
+ def my_is_partition(dev):
+ return dev[-1] in "0123456789" and not dev.startswith("sr")
+
try:
orig_find_devs_with = util.find_devs_with
util.find_devs_with = my_devs_with
+ orig_is_partition = util.is_partition
+ util.is_partition = my_is_partition
+
+ devs_with_answers = {"TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -287,6 +291,7 @@ class TestConfigDriveDataSource(MockerTestCase):
finally:
util.find_devs_with = orig_find_devs_with
+ util.is_partition = orig_is_partition
def test_pubkeys_v2(self):
"""Verify that public-keys work in config-drive-v2."""
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 85e6add0..b56fea82 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,6 +3,7 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
+from tests.unittests.helpers import populate_dir
from mocker import MockerTestCase
@@ -137,11 +138,4 @@ class TestMAASDataSource(MockerTestCase):
pass
-def populate_dir(seed_dir, files):
- os.mkdir(seed_dir)
- for (name, content) in files.iteritems():
- with open(os.path.join(seed_dir, name), "w") as fp:
- fp.write(content)
- fp.close()
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
new file mode 100644
index 00000000..62fc5358
--- /dev/null
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -0,0 +1,157 @@
+from cloudinit import helpers
+from cloudinit.sources import DataSourceNoCloud
+from cloudinit import util
+from tests.unittests.helpers import populate_dir
+
+from mocker import MockerTestCase
+import os
+import yaml
+
+
+class TestNoCloudDataSource(MockerTestCase):
+
+ def setUp(self):
+ self.tmp = self.makeDir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.cmdline = "root=TESTCMDLINE"
+
+ self.unapply = []
+ self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _getcmdline(self):
+ return self.cmdline
+
+ def test_nocloud_seed_dir(self):
+ md = {'instance-id': 'IID', 'dsmode': 'local'}
+ ud = "USER_DATA_HERE"
+ populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': ud, 'meta-data': yaml.safe_dump(md)})
+
+ sys_cfg = {
+ 'datasource': {'NoCloud': {'fs_label': None}}
+ }
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, ud)
+ self.assertEqual(dsrc.metadata, md)
+ self.assertTrue(ret)
+
+ def test_fs_label(self):
+ #find_devs_with should not be called ff fs_label is None
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ class PsuedoException(Exception):
+ pass
+
+ def my_find_devs_with(*args, **kwargs):
+ _f = (args, kwargs)
+ raise PsuedoException
+
+ self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+
+ # by default, NoCloud should search for filesystems by label
+ sys_cfg = {'datasource': {'NoCloud': {}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(PsuedoException, dsrc.get_data)
+
+ # but disabling searching should just end up with None found
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+
+ def test_no_datasource_expected(self):
+ #no source should be found if no cmdline, config, and fs_label=None
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertFalse(dsrc.get_data())
+
+ def test_seed_in_config(self):
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ data = {
+ 'fs_label': None,
+ 'meta-data': {'instance-id': 'IID'},
+ 'user-data': "USER_DATA_RAW",
+ }
+
+ sys_cfg = {'datasource': {'NoCloud': data}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
+ self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
+ self.assertTrue(ret)
+
+
+class TestParseCommandLineData(MockerTestCase):
+
+ def test_parse_cmdline_data_valid(self):
+ ds_id = "ds=nocloud"
+ pairs = (
+ ("root=/dev/sda1 %(ds_id)s", {}),
+ ("%(ds_id)s; root=/dev/foo", {}),
+ ("%(ds_id)s", {}),
+ ("%(ds_id)s;", {}),
+ ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
+ ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
+ {'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost",
+ {'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost;i=IID",
+ {'local-hostname': 'xhost', 'instance-id': 'IID'}),
+ )
+
+ for (fmt, expected) in pairs:
+ fill = {}
+ cmdline = fmt % {'ds_id': ds_id}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(expected, fill)
+ self.assertTrue(ret)
+
+ def test_parse_cmdline_data_none(self):
+ ds_id = "ds=foo"
+ cmdlines = (
+ "root=/dev/sda1 ro",
+ "console=/dev/ttyS0 root=/dev/foo",
+ "",
+ "ds=foocloud",
+ "ds=foo-net",
+ "ds=nocloud;s=SEED",
+ )
+
+ for cmdline in cmdlines:
+ fill = {}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(fill, {})
+ self.assertFalse(ret)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
+
+
+# vi: ts=4 expandtab