import os from mocker import MockerTestCase from cloudinit import util from cloudinit.sources import DataSourceOpenNebula as ds TEST_VARS={ 'var1': 'single', 'var2': 'double word', 'var3': 'multi\nline\n', 'var4': "'single'", 'var5': "'double word'", 'var6': "'multi\nline\n'", 'var7': 'single\\t', 'var8': 'double\\tword', 'var9': 'multi\\t\nline\n' } USER_DATA='#cloud-config\napt_upgrade: true' SSH_KEY='ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' HOSTNAME='foo.example.com' PUBLIC_IP='10.0.0.3' CMD_IP_OUT='''\ 1: lo: mtu 16436 qdisc noqueue state UNKNOWN \ link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 2: eth0: mtu 1500 qdisc mq state UP qlen 1000\ link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff ''' class TestOpenNebulaDataSource(MockerTestCase): def setUp(self): super(TestOpenNebulaDataSource, self).setUp() self.tmp = self.makeDir() def test_seed_dir_non_contextdisk(self): my_d = os.path.join(self.tmp, 'non-contextdisk') self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d) def test_seed_dir_bad_context_sh(self): my_d = os.path.join(self.tmp, 'bad-context-sh') os.mkdir(my_d) with open(os.path.join(my_d,"context.sh"), "w") as fp: fp.write('/bin/false\n') fp.close() self.assertRaises(ds.NonContextDiskDir,ds.read_context_disk_dir,my_d) def test_context_sh_parser(self): my_d = os.path.join(self.tmp,'context-sh-parser') populate_dir(my_d, TEST_VARS) results=ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) self.assertEqual(TEST_VARS,results['metadata']) def test_ssh_key(self): public_keys=[] for c in range(4): for k in ('SSH_KEY','SSH_PUBLIC_KEY'): my_d = os.path.join(self.tmp, "%s-%i" % (k,c)) populate_dir(my_d, {k:'\n'.join(public_keys)}) results=ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) self.assertTrue('public-keys' in results['metadata']) self.assertEqual(public_keys,results['metadata']['public-keys']) public_keys.append(SSH_KEY % (c+1,)) def test_user_data(self): for k in ('USER_DATA','USERDATA'): my_d = os.path.join(self.tmp, k) populate_dir(my_d, {k:USER_DATA}) results=ds.read_context_disk_dir(my_d) self.assertTrue('userdata' in results) self.assertEqual(USER_DATA,results['userdata']) def test_hostname(self): for k in ('HOSTNAME','PUBLIC_IP','IP_PUBLIC','ETH0_IP'): my_d = os.path.join(self.tmp, k) populate_dir(my_d, {k:PUBLIC_IP}) results=ds.read_context_disk_dir(my_d) self.assertTrue('metadata' in results) self.assertTrue('local-hostname' in results['metadata']) self.assertEqual(PUBLIC_IP,results['metadata']['local-hostname']) def test_find_candidates(self): devs_with_answers = { "TYPE=iso9660": ["/dev/vdb"], "LABEL=CDROM": ["/dev/sr0"], } def my_devs_with(criteria): return devs_with_answers[criteria] try: orig_find_devs_with = util.find_devs_with util.find_devs_with = my_devs_with self.assertEqual(["/dev/sr0","/dev/vdb"], ds.find_candidate_devs()) finally: util.find_devs_with = orig_find_devs_with class TestOpenNebulaNetwork(MockerTestCase): def setUp(self): super(TestOpenNebulaNetwork, self).setUp() def test_lo(self): net=ds.OpenNebulaNetwork('',{}) self.assertEqual(net.gen_conf(),u'''\ auto lo iface lo inet loopback ''') def test_eth0(self): net=ds.OpenNebulaNetwork(CMD_IP_OUT,{}) self.assertEqual(net.gen_conf(),u'''\ auto lo iface lo inet loopback auto eth0 iface eth0 inet static address 10.18.1.1 network 10.18.1.0 netmask 255.255.255.0 ''') def test_eth0_override(self): context_sh = { 'dns': '1.2.3.8', 'eth0_ip':'1.2.3.4', 'eth0_network':'1.2.3.0', 'eth0_mask':'255.255.0.0', 'eth0_gateway':'1.2.3.5', 'eth0_domain':'example.com', 'eth0_dns':'1.2.3.6 1.2.3.7'} net=ds.OpenNebulaNetwork(CMD_IP_OUT,context_sh) self.assertEqual(net.gen_conf(),u'''\ auto lo iface lo inet loopback auto eth0 iface eth0 inet static address 1.2.3.4 network 1.2.3.0 netmask 255.255.0.0 gateway 1.2.3.5 dns-search example.com dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7 ''') def populate_dir(seed_dir, files): os.mkdir(seed_dir) with open(os.path.join(seed_dir,"context.sh"), "w") as fp: fp.write("# Context variables generated by OpenNebula\n") for (name, content) in files.iteritems(): fp.write('%s="%s"\n' % (name.upper(),content)) fp.close() # vi: ts=4 expandtab