diff options
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 92 |
1 files changed, 46 insertions, 46 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index af8bd347..bc6c4b73 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -3,7 +3,7 @@ from mocker import MockerTestCase from cloudinit import util from cloudinit.sources import DataSourceOpenNebula as ds -TEST_VARS={ +TEST_VARS = { 'var1': 'single', 'var2': 'double word', 'var3': 'multi\nline\n', @@ -12,18 +12,19 @@ TEST_VARS={ 'var6': "'multi\nline\n'", 'var7': 'single\\t', 'var8': 'double\\tword', - 'var9': 'multi\\t\nline\n' } + 'var9': 'multi\\t\nline\n'} -USER_DATA='#cloud-config\napt_upgrade: true' -SSH_KEY='ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' -HOSTNAME='foo.example.com' -PUBLIC_IP='10.0.0.3' +USER_DATA = '#cloud-config\napt_upgrade: true' +SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' +HOSTNAME = 'foo.example.com' +PUBLIC_IP = '10.0.0.3' -CMD_IP_OUT='''\ +CMD_IP_OUT = '''\ 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \ link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000\ link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff ''' + class TestOpenNebulaDataSource(MockerTestCase): def setUp(self): @@ -32,57 +33,56 @@ class TestOpenNebulaDataSource(MockerTestCase): def test_seed_dir_non_contextdisk(self): my_d = os.path.join(self.tmp, 'non-contextdisk') - self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d) + self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d) def test_seed_dir_bad_context_sh(self): my_d = os.path.join(self.tmp, 'bad-context-sh') os.mkdir(my_d) - with open(os.path.join(my_d,"context.sh"), "w") as fp: + with open(os.path.join(my_d, "context.sh"), "w") as fp: fp.write('/bin/false\n') fp.close() - self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d) + self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d) def test_context_sh_parser(self): - my_d = os.path.join(self.tmp,'context-sh-parser') + my_d = os.path.join(self.tmp, 'context-sh-parser') populate_dir(my_d, TEST_VARS) - results=ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) - self.assertEqual(TEST_VARS,results['metadata']) + self.assertEqual(TEST_VARS, results['metadata']) def test_ssh_key(self): - public_keys=[] + public_keys = [] for c in range(4): - for k in ('SSH_KEY','SSH_PUBLIC_KEY'): - my_d = os.path.join(self.tmp, "%s-%i" % (k,c)) - populate_dir(my_d, {k:'\n'.join(public_keys)}) - results=ds.read_context_disk_dir(my_d) + for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'): + my_d = os.path.join(self.tmp, "%s-%i" % (k, c)) + populate_dir(my_d, {k: '\n'.join(public_keys)}) + results = ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) self.assertTrue('public-keys' in results['metadata']) - self.assertEqual(public_keys,results['metadata']['public-keys']) + self.assertEqual(public_keys, results['metadata']['public-keys']) - public_keys.append(SSH_KEY % (c+1,)) + public_keys.append(SSH_KEY % (c + 1,)) def test_user_data(self): - for k in ('USER_DATA','USERDATA'): + for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) - populate_dir(my_d, {k:USER_DATA}) - results=ds.read_context_disk_dir(my_d) + populate_dir(my_d, {k: USER_DATA}) + results = ds.read_context_disk_dir(my_d) self.assertTrue('userdata' in results) - self.assertEqual(USER_DATA,results['userdata']) + self.assertEqual(USER_DATA, results['userdata']) def test_hostname(self): - for k in ('HOSTNAME','PUBLIC_IP','IP_PUBLIC','ETH0_IP'): + for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'): my_d = os.path.join(self.tmp, k) - populate_dir(my_d, {k:PUBLIC_IP}) - results=ds.read_context_disk_dir(my_d) + populate_dir(my_d, {k: PUBLIC_IP}) + results = ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) self.assertTrue('local-hostname' in results['metadata']) - self.assertEqual(PUBLIC_IP,results['metadata']['local-hostname']) - + self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname']) def test_find_candidates(self): devs_with_answers = { @@ -96,7 +96,7 @@ class TestOpenNebulaDataSource(MockerTestCase): try: orig_find_devs_with = util.find_devs_with util.find_devs_with = my_devs_with - self.assertEqual(["/dev/sr0","/dev/vdb"], ds.find_candidate_devs()) + self.assertEqual(["/dev/sr0", "/dev/vdb"], ds.find_candidate_devs()) finally: util.find_devs_with = orig_find_devs_with @@ -107,15 +107,15 @@ class TestOpenNebulaNetwork(MockerTestCase): super(TestOpenNebulaNetwork, self).setUp() def test_lo(self): - net=ds.OpenNebulaNetwork('',{}) - self.assertEqual(net.gen_conf(),u'''\ + net = ds.OpenNebulaNetwork('', {}) + self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback ''') def test_eth0(self): - net=ds.OpenNebulaNetwork(CMD_IP_OUT,{}) - self.assertEqual(net.gen_conf(),u'''\ + net = ds.OpenNebulaNetwork(CMD_IP_OUT, {}) + self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback @@ -126,18 +126,18 @@ iface eth0 inet static netmask 255.255.255.0 ''') - def test_eth0_override(self): + def test_eth0_override(self): context_sh = { 'dns': '1.2.3.8', - 'eth0_ip':'1.2.3.4', - 'eth0_network':'1.2.3.0', - 'eth0_mask':'255.255.0.0', - 'eth0_gateway':'1.2.3.5', - 'eth0_domain':'example.com', - 'eth0_dns':'1.2.3.6 1.2.3.7'} - - net=ds.OpenNebulaNetwork(CMD_IP_OUT,context_sh) - self.assertEqual(net.gen_conf(),u'''\ + 'eth0_ip': '1.2.3.4', + 'eth0_network': '1.2.3.0', + 'eth0_mask': '255.255.0.0', + 'eth0_gateway': '1.2.3.5', + 'eth0_domain': 'example.com', + 'eth0_dns': '1.2.3.6 1.2.3.7'} + + net = ds.OpenNebulaNetwork(CMD_IP_OUT, context_sh) + self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback @@ -154,10 +154,10 @@ iface eth0 inet static def populate_dir(seed_dir, files): os.mkdir(seed_dir) - with open(os.path.join(seed_dir,"context.sh"), "w") as fp: + with open(os.path.join(seed_dir, "context.sh"), "w") as fp: fp.write("# Context variables generated by OpenNebula\n") for (name, content) in files.iteritems(): - fp.write('%s="%s"\n' % (name.upper(),content)) + fp.write('%s="%s"\n' % (name.upper(), content)) fp.close() # vi: ts=4 expandtab |