diff options
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 97 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 8 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 157 |
3 files changed, 209 insertions, 53 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index aa5b98ed..930086db 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -11,6 +11,7 @@ from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit import util +from tests.unittests import helpers as unit_helpers PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' EC2_META = { @@ -89,23 +90,22 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - provided_name = dev_name[len('/dev/'):] - provided_name = "s" + provided_name[1:] - find_mock(mocker.ARGS) - my_mock.result([provided_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - my_mock.restore() - self.assertEquals(dev_name, device) + with unit_helpers.mocker() as my_mock: + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + provided_name = dev_name[len('/dev/'):] + provided_name = "s" + provided_name[1:] + find_mock(mocker.ARGS) + my_mock.result([provided_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_os_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -122,19 +122,18 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - find_mock(mocker.ARGS) - my_mock.result([dev_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - my_mock.restore() - self.assertEquals(dev_name, device) + with unit_helpers.mocker() as my_mock: + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + find_mock(mocker.ARGS) + my_mock.result([dev_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_ec2_remap(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -156,17 +155,16 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) - my_mock.restore() + with unit_helpers.mocker(verify_calls=False) as my_mock: + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_ec2_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -259,19 +257,25 @@ class TestConfigDriveDataSource(MockerTestCase): ds.read_config_drive_dir, my_d) def test_find_candidates(self): - devs_with_answers = { - "TYPE=vfat": [], - "TYPE=iso9660": ["/dev/vdb"], - "LABEL=config-2": ["/dev/vdb"], - } + devs_with_answers = {} def my_devs_with(criteria): return devs_with_answers[criteria] + def my_is_partition(dev): + return dev[-1] in "0123456789" and not dev.startswith("sr") + try: orig_find_devs_with = util.find_devs_with util.find_devs_with = my_devs_with + orig_is_partition = util.is_partition + util.is_partition = my_is_partition + + devs_with_answers = {"TYPE=vfat": [], + "TYPE=iso9660": ["/dev/vdb"], + "LABEL=config-2": ["/dev/vdb"], + } self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) # add a vfat item @@ -287,6 +291,7 @@ class TestConfigDriveDataSource(MockerTestCase): finally: util.find_devs_with = orig_find_devs_with + util.is_partition = orig_is_partition def test_pubkeys_v2(self): """Verify that public-keys work in config-drive-v2.""" diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 85e6add0..b56fea82 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -3,6 +3,7 @@ import os from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper +from tests.unittests.helpers import populate_dir from mocker import MockerTestCase @@ -137,11 +138,4 @@ class TestMAASDataSource(MockerTestCase): pass -def populate_dir(seed_dir, files): - os.mkdir(seed_dir) - for (name, content) in files.iteritems(): - with open(os.path.join(seed_dir, name), "w") as fp: - fp.write(content) - fp.close() - # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py new file mode 100644 index 00000000..62fc5358 --- /dev/null +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -0,0 +1,157 @@ +from cloudinit import helpers +from cloudinit.sources import DataSourceNoCloud +from cloudinit import util +from tests.unittests.helpers import populate_dir + +from mocker import MockerTestCase +import os +import yaml + + +class TestNoCloudDataSource(MockerTestCase): + + def setUp(self): + self.tmp = self.makeDir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + self.cmdline = "root=TESTCMDLINE" + + self.unapply = [] + self.apply_patches([(util, 'get_cmdline', self._getcmdline)]) + super(TestNoCloudDataSource, self).setUp() + + def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) + super(TestNoCloudDataSource, self).setUp() + + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def _getcmdline(self): + return self.cmdline + + def test_nocloud_seed_dir(self): + md = {'instance-id': 'IID', 'dsmode': 'local'} + ud = "USER_DATA_HERE" + populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), + {'user-data': ud, 'meta-data': yaml.safe_dump(md)}) + + sys_cfg = { + 'datasource': {'NoCloud': {'fs_label': None}} + } + + ds = DataSourceNoCloud.DataSourceNoCloud + + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, ud) + self.assertEqual(dsrc.metadata, md) + self.assertTrue(ret) + + def test_fs_label(self): + #find_devs_with should not be called ff fs_label is None + ds = DataSourceNoCloud.DataSourceNoCloud + + class PsuedoException(Exception): + pass + + def my_find_devs_with(*args, **kwargs): + _f = (args, kwargs) + raise PsuedoException + + self.apply_patches([(util, 'find_devs_with', my_find_devs_with)]) + + # by default, NoCloud should search for filesystems by label + sys_cfg = {'datasource': {'NoCloud': {}}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertRaises(PsuedoException, dsrc.get_data) + + # but disabling searching should just end up with None found + sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertFalse(ret) + + def test_no_datasource_expected(self): + #no source should be found if no cmdline, config, and fs_label=None + sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} + + ds = DataSourceNoCloud.DataSourceNoCloud + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertFalse(dsrc.get_data()) + + def test_seed_in_config(self): + ds = DataSourceNoCloud.DataSourceNoCloud + + data = { + 'fs_label': None, + 'meta-data': {'instance-id': 'IID'}, + 'user-data': "USER_DATA_RAW", + } + + sys_cfg = {'datasource': {'NoCloud': data}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW") + self.assertEqual(dsrc.metadata.get('instance-id'), 'IID') + self.assertTrue(ret) + + +class TestParseCommandLineData(MockerTestCase): + + def test_parse_cmdline_data_valid(self): + ds_id = "ds=nocloud" + pairs = ( + ("root=/dev/sda1 %(ds_id)s", {}), + ("%(ds_id)s; root=/dev/foo", {}), + ("%(ds_id)s", {}), + ("%(ds_id)s;", {}), + ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}), + ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost", + {'seedfrom': 'SEED', 'local-hostname': 'xhost'}), + ("%(ds_id)s;h=xhost", + {'local-hostname': 'xhost'}), + ("%(ds_id)s;h=xhost;i=IID", + {'local-hostname': 'xhost', 'instance-id': 'IID'}), + ) + + for (fmt, expected) in pairs: + fill = {} + cmdline = fmt % {'ds_id': ds_id} + ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, + cmdline=cmdline) + self.assertEqual(expected, fill) + self.assertTrue(ret) + + def test_parse_cmdline_data_none(self): + ds_id = "ds=foo" + cmdlines = ( + "root=/dev/sda1 ro", + "console=/dev/ttyS0 root=/dev/foo", + "", + "ds=foocloud", + "ds=foo-net", + "ds=nocloud;s=SEED", + ) + + for cmdline in cmdlines: + fill = {} + ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, + cmdline=cmdline) + self.assertEqual(fill, {}) + self.assertFalse(ret) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret + + +# vi: ts=4 expandtab |