diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 95 |
1 files changed, 85 insertions, 10 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index f544e145..752638b6 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -1,4 +1,5 @@ from cloudinit.sources import DataSourceOpenNebula as ds +from cloudinit import helpers from cloudinit import util from mocker import MockerTestCase from tests.unittests.helpers import populate_dir @@ -20,6 +21,8 @@ TEST_VARS = { 'VAR12': '$', # expect $ } +INVALID_PARSEUSER = 'cloud-init-mocker-opennebula-invalid' +INVALID_CONTEXT = ';' USER_DATA = '#cloud-config\napt_upgrade: true' SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' HOSTNAME = 'foo.example.com' @@ -38,21 +41,93 @@ class TestOpenNebulaDataSource(MockerTestCase): def setUp(self): super(TestOpenNebulaDataSource, self).setUp() self.tmp = self.makeDir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + # defaults for few tests + self.ds = ds.DataSourceOpenNebula + self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula") + self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}} + + def test_get_data_non_contextdisk(self): + try: + # dont' try to lookup for CDs + orig_find_devs_with = util.find_devs_with + util.find_devs_with = lambda n: [] + + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertFalse(ret) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data_broken_contextdisk(self): + try: + # dont' try to lookup for CDs + orig_find_devs_with = util.find_devs_with + util.find_devs_with = lambda n: [] + + populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data_invalid_identity(self): + try: + # dont' try to lookup for CDs + orig_find_devs_with = util.find_devs_with + util.find_devs_with = lambda n: [] + + sys_cfg = self.sys_cfg + sys_cfg['datasource']['OpenNebula']['parseuser'] = \ + INVALID_PARSEUSER + + populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) + dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data(self): + try: + # dont' try to lookup for CDs + orig_find_devs_with = util.find_devs_with + util.find_devs_with = lambda n: [] + populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + finally: + util.find_devs_with = orig_find_devs_with def test_seed_dir_non_contextdisk(self): - my_d = os.path.join(self.tmp, 'non-contextdisk') - self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d) + self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, + self.seed_dir) + + def test_seed_dir_empty1_context(self): + populate_dir(self.seed_dir, {'context.sh': ''}) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertEqual(results['userdata'], None) + self.assertEqual(results['metadata'], {}) + + def test_seed_dir_empty2_context(self): + populate_context_dir(self.seed_dir, {}) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertEqual(results['userdata'], None) + self.assertEqual(results['metadata'], {}) + + def test_seed_dir_broken_context(self): + populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) - def test_seed_dir_bad_context(self): - my_d = os.path.join(self.tmp, 'bad-context') - os.mkdir(my_d) - open(os.path.join(my_d, "context.sh"), "w").close() - self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d) + self.assertRaises(ds.BrokenContextDiskDir, + ds.read_context_disk_dir, + self.seed_dir) def test_context_parser(self): - my_d = os.path.join(self.tmp, 'context-parser') - populate_context_dir(my_d, TEST_VARS) - results = ds.read_context_disk_dir(my_d) + populate_context_dir(self.seed_dir, TEST_VARS) + results = ds.read_context_disk_dir(self.seed_dir) self.assertTrue('metadata' in results) self.assertEqual(TEST_VARS, results['metadata']) |