diff options
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_aliyun.py | 148 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 338 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 23 |
3 files changed, 436 insertions, 73 deletions
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py new file mode 100644 index 00000000..6f1de072 --- /dev/null +++ b/tests/unittests/test_datasource/test_aliyun.py @@ -0,0 +1,148 @@ +import functools +import httpretty +import os + +from .. import helpers as test_helpers +from cloudinit import helpers +from cloudinit.sources import DataSourceAliYun as ay + +DEFAULT_METADATA = { + 'instance-id': 'aliyun-test-vm-00', + 'eipv4': '10.0.0.1', + 'hostname': 'test-hostname', + 'image-id': 'm-test', + 'launch-index': '0', + 'mac': '00:16:3e:00:00:00', + 'network-type': 'vpc', + 'private-ipv4': '192.168.0.1', + 'serial-number': 'test-string', + 'vpc-cidr-block': '192.168.0.0/16', + 'vpc-id': 'test-vpc', + 'vswitch-id': 'test-vpc', + 'vswitch-cidr-block': '192.168.0.0/16', + 'zone-id': 'test-zone-1', + 'ntp-conf': {'ntp_servers': [ + 'ntp1.aliyun.com', + 'ntp2.aliyun.com', + 'ntp3.aliyun.com']}, + 'source-address': ['http://mirrors.aliyun.com', + 'http://mirrors.aliyuncs.com'], + 'public-keys': {'key-pair-1': {'openssh-key': 'ssh-rsa AAAAB3...'}, + 'key-pair-2': {'openssh-key': 'ssh-rsa AAAAB3...'}} +} + +DEFAULT_USERDATA = """\ +#cloud-config + +hostname: localhost""" + + +def register_mock_metaserver(base_url, data): + def register_helper(register, base_url, body): + if isinstance(body, str): + register(base_url, body) + elif isinstance(body, list): + register(base_url.rstrip('/'), '\n'.join(body) + '\n') + elif isinstance(body, dict): + vals = [] + for k, v in body.items(): + if isinstance(v, (str, list)): + suffix = k.rstrip('/') + else: + suffix = k.rstrip('/') + '/' + vals.append(suffix) + url = base_url.rstrip('/') + '/' + suffix + register_helper(register, url, v) + register(base_url, '\n'.join(vals) + '\n') + + register = functools.partial(httpretty.register_uri, httpretty.GET) + register_helper(register, base_url, data) + + +class TestAliYunDatasource(test_helpers.HttprettyTestCase): + def setUp(self): + super(TestAliYunDatasource, self).setUp() + cfg = {'datasource': {'AliYun': {'timeout': '1', 'max_wait': '1'}}} + distro = {} + paths = helpers.Paths({}) + self.ds = ay.DataSourceAliYun(cfg, distro, paths) + self.metadata_address = self.ds.metadata_urls[0] + self.api_ver = self.ds.api_ver + + @property + def default_metadata(self): + return DEFAULT_METADATA + + @property + def default_userdata(self): + return DEFAULT_USERDATA + + @property + def metadata_url(self): + return os.path.join(self.metadata_address, + self.api_ver, 'meta-data') + '/' + + @property + def userdata_url(self): + return os.path.join(self.metadata_address, + self.api_ver, 'user-data') + + def regist_default_server(self): + register_mock_metaserver(self.metadata_url, self.default_metadata) + register_mock_metaserver(self.userdata_url, self.default_userdata) + + def _test_get_data(self): + self.assertEqual(self.ds.metadata, self.default_metadata) + self.assertEqual(self.ds.userdata_raw, + self.default_userdata.encode('utf8')) + + def _test_get_sshkey(self): + pub_keys = [v['openssh-key'] for (_, v) in + self.default_metadata['public-keys'].items()] + self.assertEqual(self.ds.get_public_ssh_keys(), pub_keys) + + def _test_get_iid(self): + self.assertEqual(self.default_metadata['instance-id'], + self.ds.get_instance_id()) + + def _test_host_name(self): + self.assertEqual(self.default_metadata['hostname'], + self.ds.get_hostname()) + + @httpretty.activate + def test_with_mock_server(self): + self.regist_default_server() + self.ds.get_data() + self._test_get_data() + self._test_get_sshkey() + self._test_get_iid() + self._test_host_name() + + def test_parse_public_keys(self): + public_keys = {} + self.assertEqual(ay.parse_public_keys(public_keys), []) + + public_keys = {'key-pair-0': 'ssh-key-0'} + self.assertEqual(ay.parse_public_keys(public_keys), + [public_keys['key-pair-0']]) + + public_keys = {'key-pair-0': 'ssh-key-0', 'key-pair-1': 'ssh-key-1'} + self.assertEqual(set(ay.parse_public_keys(public_keys)), + set([public_keys['key-pair-0'], + public_keys['key-pair-1']])) + + public_keys = {'key-pair-0': ['ssh-key-0', 'ssh-key-1']} + self.assertEqual(ay.parse_public_keys(public_keys), + public_keys['key-pair-0']) + + public_keys = {'key-pair-0': {'openssh-key': []}} + self.assertEqual(ay.parse_public_keys(public_keys), []) + + public_keys = {'key-pair-0': {'openssh-key': 'ssh-key-0'}} + self.assertEqual(ay.parse_public_keys(public_keys), + [public_keys['key-pair-0']['openssh-key']]) + + public_keys = {'key-pair-0': {'openssh-key': ['ssh-key-0', + 'ssh-key-1']}} + self.assertEqual(ay.parse_public_keys(public_keys), + public_keys['key-pair-0']['openssh-key']) diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index f5d2ef35..7bde0820 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -20,25 +20,123 @@ import json from cloudinit import helpers from cloudinit import settings from cloudinit.sources import DataSourceDigitalOcean +from cloudinit.sources.helpers import digitalocean -from .. import helpers as test_helpers -from ..helpers import HttprettyTestCase - -httpretty = test_helpers.import_httpretty() +from ..helpers import mock, TestCase DO_MULTIPLE_KEYS = ["ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@do.co", "ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@do.co"] DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... test@do.co" -DO_META = { - 'user_data': 'user_data_here', - 'vendor_data': 'vendor_data_here', - 'public_keys': DO_SINGLE_KEY, - 'region': 'nyc3', - 'id': '2000000', - 'hostname': 'cloudinit-test', +# the following JSON was taken from droplet (that's why its a string) +DO_META = json.loads(""" +{ + "droplet_id": "22532410", + "hostname": "utl-96268", + "vendor_data": "vendordata goes here", + "user_data": "userdata goes here", + "public_keys": "", + "auth_key": "authorization_key", + "region": "nyc3", + "interfaces": { + "private": [ + { + "ipv4": { + "ip_address": "10.132.6.205", + "netmask": "255.255.0.0", + "gateway": "10.132.0.1" + }, + "mac": "04:01:57:d1:9e:02", + "type": "private" + } + ], + "public": [ + { + "ipv4": { + "ip_address": "192.0.0.20", + "netmask": "255.255.255.0", + "gateway": "104.236.0.1" + }, + "ipv6": { + "ip_address": "2604:A880:0800:0000:1000:0000:0000:0000", + "cidr": 64, + "gateway": "2604:A880:0800:0000:0000:0000:0000:0001" + }, + "anchor_ipv4": { + "ip_address": "10.0.0.5", + "netmask": "255.255.0.0", + "gateway": "10.0.0.1" + }, + "mac": "04:01:57:d1:9e:01", + "type": "public" + } + ] + }, + "floating_ip": { + "ipv4": { + "active": false + } + }, + "dns": { + "nameservers": [ + "2001:4860:4860::8844", + "2001:4860:4860::8888", + "8.8.8.8" + ] + } +} +""") + +# This has no private interface +DO_META_2 = { + "droplet_id": 27223699, + "hostname": "smtest1", + "vendor_data": "\n".join([ + ('"Content-Type: multipart/mixed; ' + 'boundary=\"===============8645434374073493512==\"'), + 'MIME-Version: 1.0', + '', + '--===============8645434374073493512==', + 'MIME-Version: 1.0' + 'Content-Type: text/cloud-config; charset="us-ascii"' + 'Content-Transfer-Encoding: 7bit' + 'Content-Disposition: attachment; filename="cloud-config"' + '', + '#cloud-config', + 'disable_root: false', + 'manage_etc_hosts: true', + '', + '', + '--===============8645434374073493512==' + ]), + "public_keys": [ + "ssh-rsa AAAAB3NzaN...N3NtHw== smoser@brickies" + ], + "auth_key": "88888888888888888888888888888888", + "region": "nyc3", + "interfaces": { + "public": [{ + "ipv4": { + "ip_address": "45.55.249.133", + "netmask": "255.255.192.0", + "gateway": "45.55.192.1" + }, + "anchor_ipv4": { + "ip_address": "10.17.0.5", + "netmask": "255.255.0.0", + "gateway": "10.17.0.1" + }, + "mac": "ae:cc:08:7c:88:00", + "type": "public" + }] + }, + "floating_ip": {"ipv4": {"active": True, "ip_address": "138.197.59.92"}}, + "dns": {"nameservers": ["8.8.8.8", "8.8.4.4"]}, + "tags": None, } +DO_META['public_keys'] = DO_SINGLE_KEY + MD_URL = 'http://169.254.169.254/metadata/v1.json' @@ -46,69 +144,189 @@ def _mock_dmi(): return (True, DO_META.get('id')) -def _request_callback(method, uri, headers): - return (200, headers, json.dumps(DO_META)) - - -class TestDataSourceDigitalOcean(HttprettyTestCase): +class TestDataSourceDigitalOcean(TestCase): """ Test reading the meta-data """ - def setUp(self): - self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean( - settings.CFG_BUILTIN, None, - helpers.Paths({})) - self.ds._get_sysinfo = _mock_dmi - super(TestDataSourceDigitalOcean, self).setUp() - - @httpretty.activate - def test_connection(self): - httpretty.register_uri( - httpretty.GET, MD_URL, - body=json.dumps(DO_META)) - - success = self.ds.get_data() - self.assertTrue(success) - - @httpretty.activate - def test_metadata(self): - httpretty.register_uri( - httpretty.GET, MD_URL, - body=_request_callback) - self.ds.get_data() + def get_ds(self, get_sysinfo=_mock_dmi): + ds = DataSourceDigitalOcean.DataSourceDigitalOcean( + settings.CFG_BUILTIN, None, helpers.Paths({})) + ds.use_ip4LL = False + if get_sysinfo is not None: + ds._get_sysinfo = get_sysinfo + return ds - self.assertEqual(DO_META.get('user_data'), - self.ds.get_userdata_raw()) + @mock.patch('cloudinit.sources.helpers.digitalocean.read_sysinfo') + def test_returns_false_not_on_docean(self, m_read_sysinfo): + m_read_sysinfo.return_value = (False, None) + ds = self.get_ds(get_sysinfo=None) + self.assertEqual(False, ds.get_data()) + self.assertTrue(m_read_sysinfo.called) - self.assertEqual(DO_META.get('vendor_data'), - self.ds.get_vendordata_raw()) + @mock.patch('cloudinit.sources.helpers.digitalocean.read_metadata') + def test_metadata(self, mock_readmd): + mock_readmd.return_value = DO_META.copy() - self.assertEqual(DO_META.get('region'), - self.ds.availability_zone) + ds = self.get_ds() + ret = ds.get_data() + self.assertTrue(ret) - self.assertEqual(DO_META.get('id'), - self.ds.get_instance_id()) + self.assertTrue(mock_readmd.called) - self.assertEqual(DO_META.get('hostname'), - self.ds.get_hostname()) + self.assertEqual(DO_META.get('user_data'), ds.get_userdata_raw()) + self.assertEqual(DO_META.get('vendor_data'), ds.get_vendordata_raw()) + self.assertEqual(DO_META.get('region'), ds.availability_zone) + self.assertEqual(DO_META.get('droplet_id'), ds.get_instance_id()) + self.assertEqual(DO_META.get('hostname'), ds.get_hostname()) # Single key self.assertEqual([DO_META.get('public_keys')], - self.ds.get_public_ssh_keys()) + ds.get_public_ssh_keys()) - self.assertIsInstance(self.ds.get_public_ssh_keys(), list) + self.assertIsInstance(ds.get_public_ssh_keys(), list) - @httpretty.activate - def test_multiple_ssh_keys(self): - DO_META['public_keys'] = DO_MULTIPLE_KEYS - httpretty.register_uri( - httpretty.GET, MD_URL, - body=_request_callback) - self.ds.get_data() + @mock.patch('cloudinit.sources.helpers.digitalocean.read_metadata') + def test_multiple_ssh_keys(self, mock_readmd): + metadata = DO_META.copy() + metadata['public_keys'] = DO_MULTIPLE_KEYS + mock_readmd.return_value = metadata.copy() + + ds = self.get_ds() + ret = ds.get_data() + self.assertTrue(ret) + + self.assertTrue(mock_readmd.called) # Multiple keys - self.assertEqual(DO_META.get('public_keys'), - self.ds.get_public_ssh_keys()) + self.assertEqual(metadata['public_keys'], ds.get_public_ssh_keys()) + self.assertIsInstance(ds.get_public_ssh_keys(), list) + + +class TestNetworkConvert(TestCase): + + def _get_networking(self): + netcfg = digitalocean.convert_network_configuration( + DO_META['interfaces'], DO_META['dns']['nameservers']) + self.assertIn('config', netcfg) + return netcfg + + def test_networking_defined(self): + netcfg = self._get_networking() + self.assertIsNotNone(netcfg) + + for nic_def in netcfg.get('config'): + print(json.dumps(nic_def, indent=3)) + n_type = nic_def.get('type') + n_subnets = nic_def.get('type') + n_name = nic_def.get('name') + n_mac = nic_def.get('mac_address') + + self.assertIsNotNone(n_type) + self.assertIsNotNone(n_subnets) + self.assertIsNotNone(n_name) + self.assertIsNotNone(n_mac) + + def _get_nic_definition(self, int_type, expected_name): + """helper function to return if_type (i.e. public) and the expected + name used by cloud-init (i.e eth0)""" + netcfg = self._get_networking() + meta_def = (DO_META.get('interfaces')).get(int_type)[0] + + self.assertEqual(int_type, meta_def.get('type')) + + for nic_def in netcfg.get('config'): + print(nic_def) + if nic_def.get('name') == expected_name: + return nic_def, meta_def + + def _get_match_subn(self, subnets, ip_addr): + """get the matching subnet definition based on ip address""" + for subn in subnets: + address = subn.get('address') + self.assertIsNotNone(address) + + # equals won't work because of ipv6 addressing being in + # cidr notation, i.e fe00::1/64 + if ip_addr in address: + print(json.dumps(subn, indent=3)) + return subn + + def test_public_interface_defined(self): + """test that the public interface is defined as eth0""" + (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') + self.assertEqual('eth0', nic_def.get('name')) + self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address')) + self.assertEqual('physical', nic_def.get('type')) + + def test_private_interface_defined(self): + """test that the private interface is defined as eth1""" + (nic_def, meta_def) = self._get_nic_definition('private', 'eth1') + self.assertEqual('eth1', nic_def.get('name')) + self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address')) + self.assertEqual('physical', nic_def.get('type')) + + def _check_dns_nameservers(self, subn_def): + self.assertIn('dns_nameservers', subn_def) + expected_nameservers = DO_META['dns']['nameservers'] + nic_nameservers = subn_def.get('dns_nameservers') + self.assertEqual(expected_nameservers, nic_nameservers) + + def test_public_interface_ipv6(self): + """test public ipv6 addressing""" + (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') + ipv6_def = meta_def.get('ipv6') + self.assertIsNotNone(ipv6_def) + + subn_def = self._get_match_subn(nic_def.get('subnets'), + ipv6_def.get('ip_address')) + + cidr_notated_address = "{0}/{1}".format(ipv6_def.get('ip_address'), + ipv6_def.get('cidr')) + + self.assertEqual(cidr_notated_address, subn_def.get('address')) + self.assertEqual(ipv6_def.get('gateway'), subn_def.get('gateway')) + self._check_dns_nameservers(subn_def) + + def test_public_interface_ipv4(self): + """test public ipv4 addressing""" + (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') + ipv4_def = meta_def.get('ipv4') + self.assertIsNotNone(ipv4_def) + + subn_def = self._get_match_subn(nic_def.get('subnets'), + ipv4_def.get('ip_address')) + + self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask')) + self.assertEqual(ipv4_def.get('gateway'), subn_def.get('gateway')) + self._check_dns_nameservers(subn_def) + + def test_public_interface_anchor_ipv4(self): + """test public ipv4 addressing""" + (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') + ipv4_def = meta_def.get('anchor_ipv4') + self.assertIsNotNone(ipv4_def) + + subn_def = self._get_match_subn(nic_def.get('subnets'), + ipv4_def.get('ip_address')) + + self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask')) + self.assertNotIn('gateway', subn_def) + + def test_convert_without_private(self): + netcfg = digitalocean.convert_network_configuration( + DO_META_2['interfaces'], DO_META_2['dns']['nameservers']) - self.assertIsInstance(self.ds.get_public_ssh_keys(), list) + byname = {} + for i in netcfg['config']: + if 'name' in i: + if i['name'] in byname: + raise ValueError("name '%s' in config twice: %s" % + (i['name'], netcfg)) + byname[i['name']] = i + self.assertTrue('eth0' in byname) + self.assertTrue('subnets' in byname['eth0']) + eth0 = byname['eth0'] + self.assertEqual( + sorted(['45.55.249.133', '10.17.0.5']), + sorted([i['address'] for i in eth0['subnets']])) diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index d796f030..ce5b5550 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -1,7 +1,7 @@ from cloudinit import helpers from cloudinit.sources import DataSourceOpenNebula as ds from cloudinit import util -from ..helpers import TestCase, populate_dir +from ..helpers import mock, populate_dir, TestCase import os import pwd @@ -31,12 +31,7 @@ SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' HOSTNAME = 'foo.example.com' PUBLIC_IP = '10.0.0.3' -CMD_IP_OUT = '''\ -1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN - link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 -2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 - link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff -''' +DS_PATH = "cloudinit.sources.DataSourceOpenNebula" class TestOpenNebulaDataSource(TestCase): @@ -233,18 +228,19 @@ class TestOpenNebulaDataSource(TestCase): class TestOpenNebulaNetwork(unittest.TestCase): - def setUp(self): - super(TestOpenNebulaNetwork, self).setUp() + system_nics = {'02:00:0a:12:01:01': 'eth0'} def test_lo(self): - net = ds.OpenNebulaNetwork('', {}) + net = ds.OpenNebulaNetwork(context={}, system_nics_by_mac={}) self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback ''') - def test_eth0(self): - net = ds.OpenNebulaNetwork(CMD_IP_OUT, {}) + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_eth0(self, m_get_phys_by_mac): + m_get_phys_by_mac.return_value = self.system_nics + net = ds.OpenNebulaNetwork({}) self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback @@ -267,7 +263,8 @@ iface eth0 inet static 'ETH0_DNS': '1.2.3.6 1.2.3.7' } - net = ds.OpenNebulaNetwork(CMD_IP_OUT, context) + net = ds.OpenNebulaNetwork(context, + system_nics_by_mac=self.system_nics) self.assertEqual(net.gen_conf(), u'''\ auto lo iface lo inet loopback |