From 1f8183ff4750cc7f8798749987ef10912719544d Mon Sep 17 00:00:00 2001 From: Maitreyee Saikia Date: Tue, 15 Aug 2017 09:33:50 -0600 Subject: vcloud directory: Guest Customization support for passwords This feature enables the following VMware VCloud Director functionality: 1. Setting admin password 2. Expire password. 3. Set admin password and expire. Password configuration is triggered only as part of a full recustomization, that happens either on first power on or when "poweron and full recustomization" is selected. Full customization flow is determined by marker files. Unique marker ids are generated when full recustomization is requested. And marker file based on these marker ids help to determine if we need to execute the above configuration. --- cloudinit/sources/helpers/vmware/imc/config.py | 24 +++++++- .../sources/helpers/vmware/imc/config_passwd.py | 67 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 cloudinit/sources/helpers/vmware/imc/config_passwd.py (limited to 'cloudinit/sources/helpers/vmware') diff --git a/cloudinit/sources/helpers/vmware/imc/config.py b/cloudinit/sources/helpers/vmware/imc/config.py index 9a5e3a8a..49d441db 100644 --- a/cloudinit/sources/helpers/vmware/imc/config.py +++ b/cloudinit/sources/helpers/vmware/imc/config.py @@ -5,6 +5,7 @@ # # This file is part of cloud-init. See LICENSE file for license information. + from .nic import Nic @@ -14,13 +15,16 @@ class Config(object): Specification file. """ + CUSTOM_SCRIPT = 'CUSTOM-SCRIPT|SCRIPT-NAME' DNS = 'DNS|NAMESERVER|' - SUFFIX = 'DNS|SUFFIX|' + DOMAINNAME = 'NETWORK|DOMAINNAME' + HOSTNAME = 'NETWORK|HOSTNAME' + MARKERID = 'MISC|MARKER-ID' PASS = 'PASSWORD|-PASS' + RESETPASS = 'PASSWORD|RESET' + SUFFIX = 'DNS|SUFFIX|' TIMEZONE = 'DATETIME|TIMEZONE' UTC = 'DATETIME|UTC' - HOSTNAME = 'NETWORK|HOSTNAME' - DOMAINNAME = 'NETWORK|DOMAINNAME' def __init__(self, configFile): self._configFile = configFile @@ -82,4 +86,18 @@ class Config(object): return res + @property + def reset_password(self): + """Retreives if the root password needs to be reset.""" + resetPass = self._configFile.get(Config.RESETPASS, 'no') + resetPass = resetPass.lower() + if resetPass not in ('yes', 'no'): + raise ValueError('ResetPassword value should be yes/no') + return resetPass == 'yes' + + @property + def marker_id(self): + """Returns marker id.""" + return self._configFile.get(Config.MARKERID, None) + # vi: ts=4 expandtab diff --git a/cloudinit/sources/helpers/vmware/imc/config_passwd.py b/cloudinit/sources/helpers/vmware/imc/config_passwd.py new file mode 100644 index 00000000..75cfbaaf --- /dev/null +++ b/cloudinit/sources/helpers/vmware/imc/config_passwd.py @@ -0,0 +1,67 @@ +# Copyright (C) 2016 Canonical Ltd. +# Copyright (C) 2016 VMware INC. +# +# Author: Maitreyee Saikia +# +# This file is part of cloud-init. See LICENSE file for license information. + + +import logging +import os + +from cloudinit import util + +LOG = logging.getLogger(__name__) + + +class PasswordConfigurator(object): + """ + Class for changing configurations related to passwords in a VM. Includes + setting and expiring passwords. + """ + def configure(self, passwd, resetPasswd, distro): + """ + Main method to perform all functionalities based on configuration file + inputs. + @param passwd: encoded admin password. + @param resetPasswd: boolean to determine if password needs to be reset. + @return cfg: dict to be used by cloud-init set_passwd code. + """ + LOG.info('Starting password configuration') + if passwd: + passwd = util.b64d(passwd) + allRootUsers = [] + for line in open('/etc/passwd', 'r'): + if line.split(':')[2] == '0': + allRootUsers.append(line.split(':')[0]) + # read shadow file and check for each user, if its uid0 or root. + uidUsersList = [] + for line in open('/etc/shadow', 'r'): + user = line.split(':')[0] + if user in allRootUsers: + uidUsersList.append(user) + if passwd: + LOG.info('Setting admin password') + distro.set_passwd('root', passwd) + if resetPasswd: + self.reset_password(uidUsersList) + LOG.info('Configure Password completed!') + + def reset_password(self, uidUserList): + """ + Method to reset password. Use passwd --expire command. Use chage if + not succeeded using passwd command. Log failure message otherwise. + @param: list of users for which to expire password. + """ + LOG.info('Expiring password.') + for user in uidUserList: + try: + out, err = util.subp(['passwd', '--expire', user]) + except util.ProcessExecutionError as e: + if os.path.exists('/usr/bin/chage'): + out, e = util.subp(['chage', '-d', '0', user]) + else: + LOG.warning('Failed to expire password for %s with error: ' + '%s', user, e) + +# vi: ts=4 expandtab -- cgit v1.2.3 From a1dfdda2a2ae20fe026881980ddf7d16110f06e2 Mon Sep 17 00:00:00 2001 From: Sankar Tanguturi Date: Thu, 7 Sep 2017 22:16:16 -0600 Subject: vmware customization: return network config format For customizing the machines hosted on 'VMWare' hypervisor, the datasource should return the 'network config' data in 'curtin' format. This branch also fixes /etc/network/interfaces replacing the line "source /etc/network/interfaces.d/*.cfg" which is incorrectly removed when VMWare's Perl Customization Engine writes /etc/network/interfaces. Modify the code to read the customization configuration and return the converted data. Added few tests. LP: #1675063 --- cloudinit/sources/DataSourceOVF.py | 91 ++++++--- cloudinit/sources/helpers/vmware/imc/config_nic.py | 201 ++++++++++++------- .../sources/helpers/vmware/imc/guestcust_util.py | 12 +- tests/unittests/test_vmware_config_file.py | 217 +++++++++++++++++++++ 4 files changed, 418 insertions(+), 103 deletions(-) (limited to 'cloudinit/sources/helpers/vmware') diff --git a/cloudinit/sources/DataSourceOVF.py b/cloudinit/sources/DataSourceOVF.py index 73d38771..aa5f798d 100644 --- a/cloudinit/sources/DataSourceOVF.py +++ b/cloudinit/sources/DataSourceOVF.py @@ -51,6 +51,10 @@ class DataSourceOVF(sources.DataSource): self.cfg = {} self.supported_seed_starts = ("/", "file://") self.vmware_customization_supported = True + self._network_config = None + self._vmware_nics_to_enable = None + self._vmware_cust_conf = None + self._vmware_cust_found = False def __str__(self): root = sources.DataSource.__str__(self) @@ -60,8 +64,8 @@ class DataSourceOVF(sources.DataSource): found = [] md = {} ud = "" - vmwarePlatformFound = False - vmwareImcConfigFilePath = '' + vmwareImcConfigFilePath = None + nicspath = None defaults = { "instance-id": "iid-dsovf", @@ -101,25 +105,26 @@ class DataSourceOVF(sources.DataSource): logfunc=LOG.debug, msg="waiting for configuration file", func=wait_for_imc_cfg_file, - args=("/var/run/vmware-imc", "cust.cfg", max_wait)) + args=("cust.cfg", max_wait)) if vmwareImcConfigFilePath: LOG.debug("Found VMware Customization Config File at %s", vmwareImcConfigFilePath) + nicspath = wait_for_imc_cfg_file( + filename="nics.txt", maxwait=10, naplen=5) else: LOG.debug("Did not find VMware Customization Config File") else: LOG.debug("Customization for VMware platform is disabled.") if vmwareImcConfigFilePath: - nics = "" + self._vmware_nics_to_enable = "" try: cf = ConfigFile(vmwareImcConfigFilePath) - conf = Config(cf) - (md, ud, cfg) = read_vmware_imc(conf) - dirpath = os.path.dirname(vmwareImcConfigFilePath) - nics = get_nics_to_enable(dirpath) - markerid = conf.marker_id + self._vmware_cust_conf = Config(cf) + (md, ud, cfg) = read_vmware_imc(self._vmware_cust_conf) + self._vmware_nics_to_enable = get_nics_to_enable(nicspath) + markerid = self._vmware_cust_conf.marker_id markerexists = check_marker_exists(markerid) except Exception as e: LOG.debug("Error parsing the customization Config File") @@ -127,28 +132,29 @@ class DataSourceOVF(sources.DataSource): set_customization_status( GuestCustStateEnum.GUESTCUST_STATE_RUNNING, GuestCustEventEnum.GUESTCUST_EVENT_CUSTOMIZE_FAILED) - enable_nics(nics) - return False + raise e finally: util.del_dir(os.path.dirname(vmwareImcConfigFilePath)) try: - LOG.debug("Applying the Network customization") - nicConfigurator = NicConfigurator(conf.nics) - nicConfigurator.configure() + LOG.debug("Preparing the Network configuration") + self._network_config = get_network_config_from_conf( + self._vmware_cust_conf, + True, + True, + self.distro.osfamily) except Exception as e: - LOG.debug("Error applying the Network Configuration") LOG.exception(e) set_customization_status( GuestCustStateEnum.GUESTCUST_STATE_RUNNING, GuestCustEventEnum.GUESTCUST_EVENT_NETWORK_SETUP_FAILED) - enable_nics(nics) - return False + raise e + if markerid and not markerexists: LOG.debug("Applying password customization") pwdConfigurator = PasswordConfigurator() - adminpwd = conf.admin_password + adminpwd = self._vmware_cust_conf.admin_password try: - resetpwd = conf.reset_password + resetpwd = self._vmware_cust_conf.reset_password if adminpwd or resetpwd: pwdConfigurator.configure(adminpwd, resetpwd, self.distro) @@ -159,7 +165,6 @@ class DataSourceOVF(sources.DataSource): set_customization_status( GuestCustStateEnum.GUESTCUST_STATE_RUNNING, GuestCustEventEnum.GUESTCUST_EVENT_CUSTOMIZE_FAILED) - enable_nics(nics) return False if markerid: LOG.debug("Handle marker creation") @@ -170,14 +175,18 @@ class DataSourceOVF(sources.DataSource): set_customization_status( GuestCustStateEnum.GUESTCUST_STATE_RUNNING, GuestCustEventEnum.GUESTCUST_EVENT_CUSTOMIZE_FAILED) - enable_nics(nics) return False - vmwarePlatformFound = True + self._vmware_cust_found = True + found.append('vmware-tools') + + # TODO: Need to set the status to DONE only when the + # customization is done successfully. set_customization_status( GuestCustStateEnum.GUESTCUST_STATE_DONE, GuestCustErrorEnum.GUESTCUST_ERROR_SUCCESS) - enable_nics(nics) + enable_nics(self._vmware_nics_to_enable) + else: np = {'iso': transport_iso9660, 'vmware-guestd': transport_vmware_guestd, } @@ -192,7 +201,7 @@ class DataSourceOVF(sources.DataSource): found.append(name) # There was no OVF transports found - if len(found) == 0 and not vmwarePlatformFound: + if len(found) == 0: return False if 'seedfrom' in md and md['seedfrom']: @@ -237,6 +246,10 @@ class DataSourceOVF(sources.DataSource): def get_config_obj(self): return self.cfg + @property + def network_config(self): + return self._network_config + class DataSourceOVFNet(DataSourceOVF): def __init__(self, sys_cfg, distro, paths): @@ -268,12 +281,13 @@ def get_max_wait_from_cfg(cfg): return max_wait -def wait_for_imc_cfg_file(dirpath, filename, maxwait=180, naplen=5): +def wait_for_imc_cfg_file(filename, maxwait=180, naplen=5, + dirpath="/var/run/vmware-imc"): waited = 0 while waited < maxwait: - fileFullPath = search_file(dirpath, filename) - if fileFullPath: + fileFullPath = os.path.join(dirpath, filename) + if os.path.isfile(fileFullPath): return fileFullPath LOG.debug("Waiting for VMware Customization Config File") time.sleep(naplen) @@ -281,6 +295,26 @@ def wait_for_imc_cfg_file(dirpath, filename, maxwait=180, naplen=5): return None +def get_network_config_from_conf(config, use_system_devices=True, + configure=False, osfamily=None): + nicConfigurator = NicConfigurator(config.nics, use_system_devices) + nics_cfg_list = nicConfigurator.generate(configure, osfamily) + + return get_network_config(nics_cfg_list, + config.name_servers, + config.dns_suffixes) + + +def get_network_config(nics=None, nameservers=None, search=None): + config_list = nics + + if nameservers or search: + config_list.append({'type': 'nameserver', 'address': nameservers, + 'search': search}) + + return {'version': 1, 'config': config_list} + + # This will return a dict with some content # meta-data, user-data, some config def read_vmware_imc(config): @@ -296,6 +330,9 @@ def read_vmware_imc(config): if config.timezone: cfg['timezone'] = config.timezone + # Generate a unique instance-id so that re-customization will + # happen in cloud-init + md['instance-id'] = "iid-vmware-" + util.rand_str(strlen=8) return (md, ud, cfg) diff --git a/cloudinit/sources/helpers/vmware/imc/config_nic.py b/cloudinit/sources/helpers/vmware/imc/config_nic.py index 67ac21db..2fb07c59 100644 --- a/cloudinit/sources/helpers/vmware/imc/config_nic.py +++ b/cloudinit/sources/helpers/vmware/imc/config_nic.py @@ -9,22 +9,48 @@ import logging import os import re +from cloudinit.net.network_state import mask_to_net_prefix from cloudinit import util logger = logging.getLogger(__name__) +def gen_subnet(ip, netmask): + """ + Return the subnet for a given ip address and a netmask + @return (str): the subnet + @param ip: ip address + @param netmask: netmask + """ + ip_array = ip.split(".") + mask_array = netmask.split(".") + result = [] + for index in list(range(4)): + result.append(int(ip_array[index]) & int(mask_array[index])) + + return ".".join([str(x) for x in result]) + + class NicConfigurator(object): - def __init__(self, nics): + def __init__(self, nics, use_system_devices=True): """ Initialize the Nic Configurator @param nics (list) an array of nics to configure + @param use_system_devices (Bool) Get the MAC names from the system + if this is True. If False, then mac names will be retrieved from + the specified nics. """ self.nics = nics self.mac2Name = {} self.ipv4PrimaryGateway = None self.ipv6PrimaryGateway = None - self.find_devices() + + if use_system_devices: + self.find_devices() + else: + for nic in self.nics: + self.mac2Name[nic.mac.lower()] = nic.name + self._primaryNic = self.get_primary_nic() def get_primary_nic(self): @@ -61,138 +87,163 @@ class NicConfigurator(object): def gen_one_nic(self, nic): """ - Return the lines needed to configure a nic - @return (str list): the string list to configure the nic + Return the config list needed to configure a nic + @return (list): the subnets and routes list to configure the nic @param nic (NicBase): the nic to configure """ - lines = [] - name = self.mac2Name.get(nic.mac.lower()) + mac = nic.mac.lower() + name = self.mac2Name.get(mac) if not name: raise ValueError('No known device has MACADDR: %s' % nic.mac) - if nic.onboot: - lines.append('auto %s' % name) + nics_cfg_list = [] + + cfg = {'type': 'physical', 'name': name, 'mac_address': mac} + + subnet_list = [] + route_list = [] # Customize IPv4 - lines.extend(self.gen_ipv4(name, nic)) + (subnets, routes) = self.gen_ipv4(name, nic) + subnet_list.extend(subnets) + route_list.extend(routes) # Customize IPv6 - lines.extend(self.gen_ipv6(name, nic)) + (subnets, routes) = self.gen_ipv6(name, nic) + subnet_list.extend(subnets) + route_list.extend(routes) + + cfg.update({'subnets': subnet_list}) - lines.append('') + nics_cfg_list.append(cfg) + if route_list: + nics_cfg_list.extend(route_list) - return lines + return nics_cfg_list def gen_ipv4(self, name, nic): """ - Return the lines needed to configure the IPv4 setting of a nic - @return (str list): the string list to configure the gateways - @param name (str): name of the nic + Return the set of subnets and routes needed to configure the + IPv4 settings of a nic + @return (set): the set of subnet and routes to configure the gateways + @param name (str): subnet and route list for the nic @param nic (NicBase): the nic to configure """ - lines = [] + + subnet = {} + route_list = [] + + if nic.onboot: + subnet.update({'control': 'auto'}) bootproto = nic.bootProto.lower() if nic.ipv4_mode.lower() == 'disabled': bootproto = 'manual' - lines.append('iface %s inet %s' % (name, bootproto)) if bootproto != 'static': - return lines + subnet.update({'type': 'dhcp'}) + return ([subnet], route_list) + else: + subnet.update({'type': 'static'}) # Static Ipv4 addrs = nic.staticIpv4 if not addrs: - return lines + return ([subnet], route_list) v4 = addrs[0] if v4.ip: - lines.append(' address %s' % v4.ip) + subnet.update({'address': v4.ip}) if v4.netmask: - lines.append(' netmask %s' % v4.netmask) + subnet.update({'netmask': v4.netmask}) # Add the primary gateway if nic.primary and v4.gateways: self.ipv4PrimaryGateway = v4.gateways[0] - lines.append(' gateway %s metric 0' % self.ipv4PrimaryGateway) - return lines + subnet.update({'gateway': self.ipv4PrimaryGateway}) + return [subnet] # Add routes if there is no primary nic if not self._primaryNic: - lines.extend(self.gen_ipv4_route(nic, v4.gateways)) + route_list.extend(self.gen_ipv4_route(nic, + v4.gateways, + v4.netmask)) - return lines + return ([subnet], route_list) - def gen_ipv4_route(self, nic, gateways): + def gen_ipv4_route(self, nic, gateways, netmask): """ - Return the lines needed to configure additional Ipv4 route - @return (str list): the string list to configure the gateways + Return the routes list needed to configure additional Ipv4 route + @return (list): the route list to configure the gateways @param nic (NicBase): the nic to configure @param gateways (str list): the list of gateways """ - lines = [] + route_list = [] + + cidr = mask_to_net_prefix(netmask) for gateway in gateways: - lines.append(' up route add default gw %s metric 10000' % - gateway) + destination = "%s/%d" % (gen_subnet(gateway, netmask), cidr) + route_list.append({'destination': destination, + 'type': 'route', + 'gateway': gateway, + 'metric': 10000}) - return lines + return route_list def gen_ipv6(self, name, nic): """ - Return the lines needed to configure the gateways for a nic - @return (str list): the string list to configure the gateways + Return the set of subnets and routes needed to configure the + gateways for a nic + @return (set): the set of subnets and routes to configure the gateways @param name (str): name of the nic @param nic (NicBase): the nic to configure """ - lines = [] if not nic.staticIpv6: - return lines + return ([], []) + subnet_list = [] # Static Ipv6 addrs = nic.staticIpv6 - lines.append('iface %s inet6 static' % name) - lines.append(' address %s' % addrs[0].ip) - lines.append(' netmask %s' % addrs[0].netmask) - for addr in addrs[1:]: - lines.append(' up ifconfig %s inet6 add %s/%s' % (name, addr.ip, - addr.netmask)) - # Add the primary gateway - if nic.primary: - for addr in addrs: - if addr.gateway: - self.ipv6PrimaryGateway = addr.gateway - lines.append(' gateway %s' % self.ipv6PrimaryGateway) - return lines + for addr in addrs: + subnet = {'type': 'static6', + 'address': addr.ip, + 'netmask': addr.netmask} + subnet_list.append(subnet) - # Add routes if there is no primary nic - if not self._primaryNic: - lines.extend(self._genIpv6Route(name, nic, addrs)) + # TODO: Add the primary gateway + + route_list = [] + # TODO: Add routes if there is no primary nic + # if not self._primaryNic: + # route_list.extend(self._genIpv6Route(name, nic, addrs)) - return lines + return (subnet_list, route_list) def _genIpv6Route(self, name, nic, addrs): - lines = [] + route_list = [] for addr in addrs: - lines.append(' up route -A inet6 add default gw ' - '%s metric 10000' % addr.gateway) + route_list.append({'type': 'route', + 'gateway': addr.gateway, + 'metric': 10000}) + + return route_list - return lines + def generate(self, configure=False, osfamily=None): + """Return the config elements that are needed to configure the nics""" + if configure: + logger.info("Configuring the interfaces file") + self.configure(osfamily) - def generate(self): - """Return the lines that is needed to configure the nics""" - lines = [] - lines.append('iface lo inet loopback') - lines.append('auto lo') - lines.append('') + nics_cfg_list = [] for nic in self.nics: - lines.extend(self.gen_one_nic(nic)) + nics_cfg_list.extend(self.gen_one_nic(nic)) - return lines + return nics_cfg_list def clear_dhcp(self): logger.info('Clearing DHCP leases') @@ -201,11 +252,16 @@ class NicConfigurator(object): util.subp(["pkill", "dhclient"], rcs=[0, 1]) util.subp(["rm", "-f", "/var/lib/dhcp/*"]) - def configure(self): + def configure(self, osfamily=None): """ - Configure the /etc/network/intefaces + Configure the /etc/network/interfaces Make a back up of the original """ + + if not osfamily or osfamily != "debian": + logger.info("Debian OS not detected. Skipping the configure step") + return + containingDir = '/etc/network' interfaceFile = os.path.join(containingDir, 'interfaces') @@ -215,10 +271,13 @@ class NicConfigurator(object): if not os.path.exists(originalFile) and os.path.exists(interfaceFile): os.rename(interfaceFile, originalFile) - lines = self.generate() - with open(interfaceFile, 'w') as fp: - for line in lines: - fp.write('%s\n' % line) + lines = [ + "# DO NOT EDIT THIS FILE BY HAND --" + " AUTOMATICALLY GENERATED BY cloud-init", + "source /etc/network/interfaces.d/*.cfg", + ] + + util.write_file(interfaceFile, content='\n'.join(lines)) self.clear_dhcp() diff --git a/cloudinit/sources/helpers/vmware/imc/guestcust_util.py b/cloudinit/sources/helpers/vmware/imc/guestcust_util.py index 1ab6bd41..44075255 100644 --- a/cloudinit/sources/helpers/vmware/imc/guestcust_util.py +++ b/cloudinit/sources/helpers/vmware/imc/guestcust_util.py @@ -59,14 +59,16 @@ def set_customization_status(custstate, custerror, errormessage=None): return (out, err) -# This will read the file nics.txt in the specified directory -# and return the content -def get_nics_to_enable(dirpath): - if not dirpath: +def get_nics_to_enable(nicsfilepath): + """Reads the NICS from the specified file path and returns the content + + @param nicsfilepath: Absolute file path to the NICS.txt file. + """ + + if not nicsfilepath: return None NICS_SIZE = 1024 - nicsfilepath = os.path.join(dirpath, "nics.txt") if not os.path.exists(nicsfilepath): return None diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py index d8651077..808d303a 100644 --- a/tests/unittests/test_vmware_config_file.py +++ b/tests/unittests/test_vmware_config_file.py @@ -8,9 +8,13 @@ import logging import sys +from cloudinit.sources.DataSourceOVF import get_network_config_from_conf +from cloudinit.sources.DataSourceOVF import read_vmware_imc from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum from cloudinit.sources.helpers.vmware.imc.config import Config from cloudinit.sources.helpers.vmware.imc.config_file import ConfigFile +from cloudinit.sources.helpers.vmware.imc.config_nic import gen_subnet +from cloudinit.sources.helpers.vmware.imc.config_nic import NicConfigurator from cloudinit.tests.helpers import CiTestCase logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) @@ -20,6 +24,7 @@ logger = logging.getLogger(__name__) class TestVmwareConfigFile(CiTestCase): def test_utility_methods(self): + """Tests basic utility methods of ConfigFile class""" cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") cf.clear() @@ -43,7 +48,26 @@ class TestVmwareConfigFile(CiTestCase): self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar") self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar") + def test_datasource_instance_id(self): + """Tests instance id for the DatasourceOVF""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + instance_id_prefix = 'iid-vmware-' + + conf = Config(cf) + + (md1, _, _) = read_vmware_imc(conf) + self.assertIn(instance_id_prefix, md1["instance-id"]) + self.assertEqual(len(md1["instance-id"]), len(instance_id_prefix) + 8) + + (md2, _, _) = read_vmware_imc(conf) + self.assertIn(instance_id_prefix, md2["instance-id"]) + self.assertEqual(len(md2["instance-id"]), len(instance_id_prefix) + 8) + + self.assertNotEqual(md1["instance-id"], md2["instance-id"]) + def test_configfile_static_2nics(self): + """Tests Config class for a configuration with two static NICs.""" cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") conf = Config(cf) @@ -81,6 +105,7 @@ class TestVmwareConfigFile(CiTestCase): self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp") def test_config_file_dhcp_2nics(self): + """Tests Config class for a configuration with two DHCP NICs.""" cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") conf = Config(cf) @@ -117,5 +142,197 @@ class TestVmwareConfigFile(CiTestCase): conf = Config(cf) self.assertTrue(conf.reset_password, "reset password") + def test_get_config_nameservers(self): + """Tests DNS and nameserver settings in a configuration.""" + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + config = Config(cf) + + network_config = get_network_config_from_conf(config, False) + + self.assertEqual(1, network_config.get('version')) + + config_types = network_config.get('config') + name_servers = None + dns_suffixes = None + + for type in config_types: + if type.get('type') == 'nameserver': + name_servers = type.get('address') + dns_suffixes = type.get('search') + break + + self.assertEqual(['10.20.145.1', '10.20.145.2'], + name_servers, + "dns") + self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'], + dns_suffixes, + "suffixes") + + def test_gen_subnet(self): + """Tests if gen_subnet properly calculates network subnet from + IPv4 address and netmask""" + ip_subnet_list = [['10.20.87.253', '255.255.252.0', '10.20.84.0'], + ['10.20.92.105', '255.255.252.0', '10.20.92.0'], + ['192.168.0.10', '255.255.0.0', '192.168.0.0']] + for entry in ip_subnet_list: + self.assertEqual(entry[2], gen_subnet(entry[0], entry[1]), + "Subnet for a specified ip and netmask") + + def test_get_config_dns_suffixes(self): + """Tests if get_network_config_from_conf properly + generates nameservers and dns settings from a + specified configuration""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + config = Config(cf) + + network_config = get_network_config_from_conf(config, False) + + self.assertEqual(1, network_config.get('version')) + + config_types = network_config.get('config') + name_servers = None + dns_suffixes = None + + for type in config_types: + if type.get('type') == 'nameserver': + name_servers = type.get('address') + dns_suffixes = type.get('search') + break + + self.assertEqual([], + name_servers, + "dns") + self.assertEqual(['eng.vmware.com'], + dns_suffixes, + "suffixes") + + def test_get_nics_list_dhcp(self): + """Tests if NicConfigurator properly calculates network subnets + for a configuration with a list of DHCP NICs""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + config = Config(cf) + + nicConfigurator = NicConfigurator(config.nics, False) + nics_cfg_list = nicConfigurator.generate() + + self.assertEqual(2, len(nics_cfg_list), "number of config elements") + + nic1 = {'name': 'NIC1'} + nic2 = {'name': 'NIC2'} + for cfg in nics_cfg_list: + if cfg.get('name') == nic1.get('name'): + nic1.update(cfg) + elif cfg.get('name') == nic2.get('name'): + nic2.update(cfg) + + self.assertEqual('physical', nic1.get('type'), 'type of NIC1') + self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1') + self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'), + 'mac address of NIC1') + subnets = nic1.get('subnets') + self.assertEqual(1, len(subnets), 'number of subnets for NIC1') + subnet = subnets[0] + self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC1') + self.assertEqual('auto', subnet.get('control'), 'NIC1 Control type') + + self.assertEqual('physical', nic2.get('type'), 'type of NIC2') + self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2') + self.assertEqual('00:50:56:a6:5a:de', nic2.get('mac_address'), + 'mac address of NIC2') + subnets = nic2.get('subnets') + self.assertEqual(1, len(subnets), 'number of subnets for NIC2') + subnet = subnets[0] + self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC2') + self.assertEqual('auto', subnet.get('control'), 'NIC2 Control type') + + def test_get_nics_list_static(self): + """Tests if NicConfigurator properly calculates network subnets + for a configuration with 2 static NICs""" + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + config = Config(cf) + + nicConfigurator = NicConfigurator(config.nics, False) + nics_cfg_list = nicConfigurator.generate() + + self.assertEqual(5, len(nics_cfg_list), "number of elements") + + nic1 = {'name': 'NIC1'} + nic2 = {'name': 'NIC2'} + route_list = [] + for cfg in nics_cfg_list: + cfg_type = cfg.get('type') + if cfg_type == 'physical': + if cfg.get('name') == nic1.get('name'): + nic1.update(cfg) + elif cfg.get('name') == nic2.get('name'): + nic2.update(cfg) + elif cfg_type == 'route': + route_list.append(cfg) + + self.assertEqual('physical', nic1.get('type'), 'type of NIC1') + self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1') + self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'), + 'mac address of NIC1') + + subnets = nic1.get('subnets') + self.assertEqual(2, len(subnets), 'Number of subnets') + + static_subnet = [] + static6_subnet = [] + + for subnet in subnets: + subnet_type = subnet.get('type') + if subnet_type == 'static': + static_subnet.append(subnet) + elif subnet_type == 'static6': + static6_subnet.append(subnet) + else: + self.assertEqual(True, False, 'Unknown type') + + self.assertEqual(1, len(static_subnet), 'Number of static subnet') + self.assertEqual(1, len(static6_subnet), 'Number of static6 subnet') + + subnet = static_subnet[0] + self.assertEqual('10.20.87.154', subnet.get('address'), + 'IPv4 address of static subnet') + self.assertEqual('255.255.252.0', subnet.get('netmask'), + 'NetMask of static subnet') + self.assertEqual('auto', subnet.get('control'), + 'control for static subnet') + + subnet = static6_subnet[0] + self.assertEqual('fc00:10:20:87::154', subnet.get('address'), + 'IPv6 address of static subnet') + self.assertEqual('64', subnet.get('netmask'), + 'NetMask of static6 subnet') + + route_set = set(['10.20.87.253', '10.20.87.105', '192.168.0.10']) + for route in route_list: + self.assertEqual(10000, route.get('metric'), 'metric of route') + gateway = route.get('gateway') + if gateway in route_set: + route_set.discard(gateway) + else: + self.assertEqual(True, False, 'invalid gateway %s' % (gateway)) + + self.assertEqual('physical', nic2.get('type'), 'type of NIC2') + self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2') + self.assertEqual('00:50:56:a6:ef:7d', nic2.get('mac_address'), + 'mac address of NIC2') + + subnets = nic2.get('subnets') + self.assertEqual(1, len(subnets), 'Number of subnets for NIC2') + + subnet = subnets[0] + self.assertEqual('static', subnet.get('type'), 'Subnet type') + self.assertEqual('192.168.6.102', subnet.get('address'), + 'Subnet address') + self.assertEqual('255.255.0.0', subnet.get('netmask'), + 'Subnet netmask') + # vi: ts=4 expandtab -- cgit v1.2.3