summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py92
1 files changed, 46 insertions, 46 deletions
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index af8bd347..bc6c4b73 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -3,7 +3,7 @@ from mocker import MockerTestCase
from cloudinit import util
from cloudinit.sources import DataSourceOpenNebula as ds
-TEST_VARS={
+TEST_VARS = {
'var1': 'single',
'var2': 'double word',
'var3': 'multi\nline\n',
@@ -12,18 +12,19 @@ TEST_VARS={
'var6': "'multi\nline\n'",
'var7': 'single\\t',
'var8': 'double\\tword',
- 'var9': 'multi\\t\nline\n' }
+ 'var9': 'multi\\t\nline\n'}
-USER_DATA='#cloud-config\napt_upgrade: true'
-SSH_KEY='ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
-HOSTNAME='foo.example.com'
-PUBLIC_IP='10.0.0.3'
+USER_DATA = '#cloud-config\napt_upgrade: true'
+SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
+HOSTNAME = 'foo.example.com'
+PUBLIC_IP = '10.0.0.3'
-CMD_IP_OUT='''\
+CMD_IP_OUT = '''\
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \ link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000\ link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
'''
+
class TestOpenNebulaDataSource(MockerTestCase):
def setUp(self):
@@ -32,57 +33,56 @@ class TestOpenNebulaDataSource(MockerTestCase):
def test_seed_dir_non_contextdisk(self):
my_d = os.path.join(self.tmp, 'non-contextdisk')
- self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d)
+ self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d)
def test_seed_dir_bad_context_sh(self):
my_d = os.path.join(self.tmp, 'bad-context-sh')
os.mkdir(my_d)
- with open(os.path.join(my_d,"context.sh"), "w") as fp:
+ with open(os.path.join(my_d, "context.sh"), "w") as fp:
fp.write('/bin/false\n')
fp.close()
- self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d)
+ self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, my_d)
def test_context_sh_parser(self):
- my_d = os.path.join(self.tmp,'context-sh-parser')
+ my_d = os.path.join(self.tmp, 'context-sh-parser')
populate_dir(my_d, TEST_VARS)
- results=ds.read_context_disk_dir(my_d)
+ results = ds.read_context_disk_dir(my_d)
self.assertTrue('metadata' in results)
- self.assertEqual(TEST_VARS,results['metadata'])
+ self.assertEqual(TEST_VARS, results['metadata'])
def test_ssh_key(self):
- public_keys=[]
+ public_keys = []
for c in range(4):
- for k in ('SSH_KEY','SSH_PUBLIC_KEY'):
- my_d = os.path.join(self.tmp, "%s-%i" % (k,c))
- populate_dir(my_d, {k:'\n'.join(public_keys)})
- results=ds.read_context_disk_dir(my_d)
+ for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
+ my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
+ populate_dir(my_d, {k: '\n'.join(public_keys)})
+ results = ds.read_context_disk_dir(my_d)
self.assertTrue('metadata' in results)
self.assertTrue('public-keys' in results['metadata'])
- self.assertEqual(public_keys,results['metadata']['public-keys'])
+ self.assertEqual(public_keys, results['metadata']['public-keys'])
- public_keys.append(SSH_KEY % (c+1,))
+ public_keys.append(SSH_KEY % (c + 1,))
def test_user_data(self):
- for k in ('USER_DATA','USERDATA'):
+ for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_dir(my_d, {k:USER_DATA})
- results=ds.read_context_disk_dir(my_d)
+ populate_dir(my_d, {k: USER_DATA})
+ results = ds.read_context_disk_dir(my_d)
self.assertTrue('userdata' in results)
- self.assertEqual(USER_DATA,results['userdata'])
+ self.assertEqual(USER_DATA, results['userdata'])
def test_hostname(self):
- for k in ('HOSTNAME','PUBLIC_IP','IP_PUBLIC','ETH0_IP'):
+ for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
my_d = os.path.join(self.tmp, k)
- populate_dir(my_d, {k:PUBLIC_IP})
- results=ds.read_context_disk_dir(my_d)
+ populate_dir(my_d, {k: PUBLIC_IP})
+ results = ds.read_context_disk_dir(my_d)
self.assertTrue('metadata' in results)
self.assertTrue('local-hostname' in results['metadata'])
- self.assertEqual(PUBLIC_IP,results['metadata']['local-hostname'])
-
+ self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
def test_find_candidates(self):
devs_with_answers = {
@@ -96,7 +96,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
try:
orig_find_devs_with = util.find_devs_with
util.find_devs_with = my_devs_with
- self.assertEqual(["/dev/sr0","/dev/vdb"], ds.find_candidate_devs())
+ self.assertEqual(["/dev/sr0", "/dev/vdb"], ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
@@ -107,15 +107,15 @@ class TestOpenNebulaNetwork(MockerTestCase):
super(TestOpenNebulaNetwork, self).setUp()
def test_lo(self):
- net=ds.OpenNebulaNetwork('',{})
- self.assertEqual(net.gen_conf(),u'''\
+ net = ds.OpenNebulaNetwork('', {})
+ self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
''')
def test_eth0(self):
- net=ds.OpenNebulaNetwork(CMD_IP_OUT,{})
- self.assertEqual(net.gen_conf(),u'''\
+ net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
+ self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
@@ -126,18 +126,18 @@ iface eth0 inet static
netmask 255.255.255.0
''')
- def test_eth0_override(self):
+ def test_eth0_override(self):
context_sh = {
'dns': '1.2.3.8',
- 'eth0_ip':'1.2.3.4',
- 'eth0_network':'1.2.3.0',
- 'eth0_mask':'255.255.0.0',
- 'eth0_gateway':'1.2.3.5',
- 'eth0_domain':'example.com',
- 'eth0_dns':'1.2.3.6 1.2.3.7'}
-
- net=ds.OpenNebulaNetwork(CMD_IP_OUT,context_sh)
- self.assertEqual(net.gen_conf(),u'''\
+ 'eth0_ip': '1.2.3.4',
+ 'eth0_network': '1.2.3.0',
+ 'eth0_mask': '255.255.0.0',
+ 'eth0_gateway': '1.2.3.5',
+ 'eth0_domain': 'example.com',
+ 'eth0_dns': '1.2.3.6 1.2.3.7'}
+
+ net = ds.OpenNebulaNetwork(CMD_IP_OUT, context_sh)
+ self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
@@ -154,10 +154,10 @@ iface eth0 inet static
def populate_dir(seed_dir, files):
os.mkdir(seed_dir)
- with open(os.path.join(seed_dir,"context.sh"), "w") as fp:
+ with open(os.path.join(seed_dir, "context.sh"), "w") as fp:
fp.write("# Context variables generated by OpenNebula\n")
for (name, content) in files.iteritems():
- fp.write('%s="%s"\n' % (name.upper(),content))
+ fp.write('%s="%s"\n' % (name.upper(), content))
fp.close()
# vi: ts=4 expandtab