summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_opennebula.py
diff options
context:
space:
mode:
authorVlastimil Holer <vlastimil.holer@gmail.com>2013-09-10 15:37:15 +0200
committerVlastimil Holer <vlastimil.holer@gmail.com>2013-09-10 15:37:15 +0200
commit64a2f3c7d4eae1354134c021db232c117eb1c772 (patch)
tree58953301347055f991ead4dee48e93a96f41b528 /tests/unittests/test_datasource/test_opennebula.py
parent7f3b1198da74430345d69e485326b03a3fa5b455 (diff)
downloadvyos-cloud-init-64a2f3c7d4eae1354134c021db232c117eb1c772.tar.gz
vyos-cloud-init-64a2f3c7d4eae1354134c021db232c117eb1c772.zip
Configurable OpenNebula::parseuser. Seed search dir+dev merge.
Eat shell parser error output. Few tests for tests for get_data.
Diffstat (limited to 'tests/unittests/test_datasource/test_opennebula.py')
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py95
1 files changed, 85 insertions, 10 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index f544e145..752638b6 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,4 +1,5 @@
from cloudinit.sources import DataSourceOpenNebula as ds
+from cloudinit import helpers
from cloudinit import util
from mocker import MockerTestCase
from tests.unittests.helpers import populate_dir
@@ -20,6 +21,8 @@ TEST_VARS = {
'VAR12': '$', # expect $
}
+INVALID_PARSEUSER = 'cloud-init-mocker-opennebula-invalid'
+INVALID_CONTEXT = ';'
USER_DATA = '#cloud-config\napt_upgrade: true'
SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
HOSTNAME = 'foo.example.com'
@@ -38,21 +41,93 @@ class TestOpenNebulaDataSource(MockerTestCase):
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
self.tmp = self.makeDir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ # defaults for few tests
+ self.ds = ds.DataSourceOpenNebula
+ self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
+ self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
+
+ def test_get_data_non_contextdisk(self):
+ try:
+ # dont' try to lookup for CDs
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = lambda n: []
+
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data_broken_contextdisk(self):
+ try:
+ # dont' try to lookup for CDs
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = lambda n: []
+
+ populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data_invalid_identity(self):
+ try:
+ # dont' try to lookup for CDs
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = lambda n: []
+
+ sys_cfg = self.sys_cfg
+ sys_cfg['datasource']['OpenNebula']['parseuser'] = \
+ INVALID_PARSEUSER
+
+ populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
+ dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data(self):
+ try:
+ # dont' try to lookup for CDs
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = lambda n: []
+ populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ finally:
+ util.find_devs_with = orig_find_devs_with
def test_seed_dir_non_contextdisk(self):
- my_d = os.path.join(self.tmp, 'non-contextdisk')
- self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d)
+ self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
+ self.seed_dir)
+
+ def test_seed_dir_empty1_context(self):
+ populate_dir(self.seed_dir, {'context.sh': ''})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertEqual(results['userdata'], None)
+ self.assertEqual(results['metadata'], {})
+
+ def test_seed_dir_empty2_context(self):
+ populate_context_dir(self.seed_dir, {})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertEqual(results['userdata'], None)
+ self.assertEqual(results['metadata'], {})
+
+ def test_seed_dir_broken_context(self):
+ populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
- def test_seed_dir_bad_context(self):
- my_d = os.path.join(self.tmp, 'bad-context')
- os.mkdir(my_d)
- open(os.path.join(my_d, "context.sh"), "w").close()
- self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d)
+ self.assertRaises(ds.BrokenContextDiskDir,
+ ds.read_context_disk_dir,
+ self.seed_dir)
def test_context_parser(self):
- my_d = os.path.join(self.tmp, 'context-parser')
- populate_context_dir(my_d, TEST_VARS)
- results = ds.read_context_disk_dir(my_d)
+ populate_context_dir(self.seed_dir, TEST_VARS)
+ results = ds.read_context_disk_dir(self.seed_dir)
self.assertTrue('metadata' in results)
self.assertEqual(TEST_VARS, results['metadata'])