diff options
author | zdc <zdc@users.noreply.github.com> | 2022-03-26 15:41:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-26 15:41:59 +0200 |
commit | aa60d48c2711cdcd9f88a4e5c77379adb0408231 (patch) | |
tree | 349631a02467dae0158f6f663cc8aa8537974a97 /tests/unittests/sources/vmware/test_vmware_config_file.py | |
parent | 5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff) | |
parent | 31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (diff) | |
download | vyos-cloud-init-aa60d48c2711cdcd9f88a4e5c77379adb0408231.tar.gz vyos-cloud-init-aa60d48c2711cdcd9f88a4e5c77379adb0408231.zip |
Merge pull request #51 from zdc/T2117-sagitta-22.1
T2117: Cloud-init updated to 22.1
Diffstat (limited to 'tests/unittests/sources/vmware/test_vmware_config_file.py')
-rw-r--r-- | tests/unittests/sources/vmware/test_vmware_config_file.py | 635 |
1 files changed, 635 insertions, 0 deletions
diff --git a/tests/unittests/sources/vmware/test_vmware_config_file.py b/tests/unittests/sources/vmware/test_vmware_config_file.py new file mode 100644 index 00000000..38d45d0e --- /dev/null +++ b/tests/unittests/sources/vmware/test_vmware_config_file.py @@ -0,0 +1,635 @@ +# Copyright (C) 2015 Canonical Ltd. +# Copyright (C) 2016 VMware INC. +# +# Author: Sankar Tanguturi <stanguturi@vmware.com> +# Pengpeng Sun <pengpengs@vmware.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +import logging +import os +import sys +import tempfile +import textwrap + +from cloudinit.sources.DataSourceOVF import ( + get_network_config_from_conf, + read_vmware_imc, +) +from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum +from cloudinit.sources.helpers.vmware.imc.config import Config +from cloudinit.sources.helpers.vmware.imc.config_file import ( + ConfigFile as WrappedConfigFile, +) +from cloudinit.sources.helpers.vmware.imc.config_nic import ( + NicConfigurator, + gen_subnet, +) +from tests.unittests.helpers import CiTestCase, cloud_init_project_dir + +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) +logger = logging.getLogger(__name__) + + +def ConfigFile(path: str): + return WrappedConfigFile(cloud_init_project_dir(path)) + + +class TestVmwareConfigFile(CiTestCase): + def test_utility_methods(self): + """Tests basic utility methods of ConfigFile class""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + cf.clear() + + self.assertEqual(0, len(cf), "clear size") + + cf._insertKey(" PASSWORD|-PASS ", " foo ") + cf._insertKey("BAR", " ") + + self.assertEqual(2, len(cf), "insert size") + self.assertEqual("foo", cf["PASSWORD|-PASS"], "password") + self.assertTrue("PASSWORD|-PASS" in cf, "hasPassword") + self.assertFalse( + cf.should_keep_current_value("PASSWORD|-PASS"), "keepPassword" + ) + self.assertFalse( + cf.should_remove_current_value("PASSWORD|-PASS"), "removePassword" + ) + self.assertFalse("FOO" in cf, "hasFoo") + self.assertTrue(cf.should_keep_current_value("FOO"), "keepFoo") + self.assertFalse(cf.should_remove_current_value("FOO"), "removeFoo") + self.assertTrue("BAR" in cf, "hasBar") + self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar") + self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar") + + def test_datasource_instance_id(self): + """Tests instance id for the DatasourceOVF""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + instance_id_prefix = "iid-vmware-" + + conf = Config(cf) + + (md1, _, _) = read_vmware_imc(conf) + self.assertIn(instance_id_prefix, md1["instance-id"]) + self.assertEqual(md1["instance-id"], "iid-vmware-imc") + + (md2, _, _) = read_vmware_imc(conf) + self.assertIn(instance_id_prefix, md2["instance-id"]) + self.assertEqual(md2["instance-id"], "iid-vmware-imc") + + self.assertEqual(md2["instance-id"], md1["instance-id"]) + + def test_configfile_static_2nics(self): + """Tests Config class for a configuration with two static NICs.""" + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + conf = Config(cf) + + self.assertEqual("myhost1", conf.host_name, "hostName") + self.assertEqual("Africa/Abidjan", conf.timezone, "tz") + self.assertTrue(conf.utc, "utc") + + self.assertEqual( + ["10.20.145.1", "10.20.145.2"], conf.name_servers, "dns" + ) + self.assertEqual( + ["eng.vmware.com", "proxy.vmware.com"], + conf.dns_suffixes, + "suffixes", + ) + + nics = conf.nics + ipv40 = nics[0].staticIpv4 + + self.assertEqual(2, len(nics), "nics") + self.assertEqual("NIC1", nics[0].name, "nic0") + self.assertEqual("00:50:56:a6:8c:08", nics[0].mac, "mac0") + self.assertEqual(BootProtoEnum.STATIC, nics[0].bootProto, "bootproto0") + self.assertEqual("10.20.87.154", ipv40[0].ip, "ipv4Addr0") + self.assertEqual("255.255.252.0", ipv40[0].netmask, "ipv4Mask0") + self.assertEqual(2, len(ipv40[0].gateways), "ipv4Gw0") + self.assertEqual("10.20.87.253", ipv40[0].gateways[0], "ipv4Gw0_0") + self.assertEqual("10.20.87.105", ipv40[0].gateways[1], "ipv4Gw0_1") + + self.assertEqual(1, len(nics[0].staticIpv6), "ipv6Cnt0") + self.assertEqual( + "fc00:10:20:87::154", nics[0].staticIpv6[0].ip, "ipv6Addr0" + ) + + self.assertEqual("NIC2", nics[1].name, "nic1") + self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp") + + def test_config_file_dhcp_2nics(self): + """Tests Config class for a configuration with two DHCP NICs.""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + conf = Config(cf) + nics = conf.nics + self.assertEqual(2, len(nics), "nics") + self.assertEqual("NIC1", nics[0].name, "nic0") + self.assertEqual("00:50:56:a6:8c:08", nics[0].mac, "mac0") + self.assertEqual(BootProtoEnum.DHCP, nics[0].bootProto, "bootproto0") + + def test_config_password(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + cf._insertKey("PASSWORD|-PASS", "test-password") + cf._insertKey("PASSWORD|RESET", "no") + + conf = Config(cf) + self.assertEqual("test-password", conf.admin_password, "password") + self.assertFalse(conf.reset_password, "do not reset password") + + def test_config_reset_passwd(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + cf._insertKey("PASSWORD|-PASS", "test-password") + cf._insertKey("PASSWORD|RESET", "random") + + conf = Config(cf) + with self.assertRaises(ValueError): + pw = conf.reset_password + self.assertIsNone(pw) + + cf.clear() + cf._insertKey("PASSWORD|RESET", "yes") + self.assertEqual(1, len(cf), "insert size") + + conf = Config(cf) + self.assertTrue(conf.reset_password, "reset password") + + def test_get_config_nameservers(self): + """Tests DNS and nameserver settings in a configuration.""" + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + config = Config(cf) + + network_config = get_network_config_from_conf(config, False) + + self.assertEqual(1, network_config.get("version")) + + config_types = network_config.get("config") + name_servers = None + dns_suffixes = None + + for type in config_types: + if type.get("type") == "nameserver": + name_servers = type.get("address") + dns_suffixes = type.get("search") + break + + self.assertEqual(["10.20.145.1", "10.20.145.2"], name_servers, "dns") + self.assertEqual( + ["eng.vmware.com", "proxy.vmware.com"], dns_suffixes, "suffixes" + ) + + def test_gen_subnet(self): + """Tests if gen_subnet properly calculates network subnet from + IPv4 address and netmask""" + ip_subnet_list = [ + ["10.20.87.253", "255.255.252.0", "10.20.84.0"], + ["10.20.92.105", "255.255.252.0", "10.20.92.0"], + ["192.168.0.10", "255.255.0.0", "192.168.0.0"], + ] + for entry in ip_subnet_list: + self.assertEqual( + entry[2], + gen_subnet(entry[0], entry[1]), + "Subnet for a specified ip and netmask", + ) + + def test_get_config_dns_suffixes(self): + """Tests if get_network_config_from_conf properly + generates nameservers and dns settings from a + specified configuration""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + config = Config(cf) + + network_config = get_network_config_from_conf(config, False) + + self.assertEqual(1, network_config.get("version")) + + config_types = network_config.get("config") + name_servers = None + dns_suffixes = None + + for type in config_types: + if type.get("type") == "nameserver": + name_servers = type.get("address") + dns_suffixes = type.get("search") + break + + self.assertEqual([], name_servers, "dns") + self.assertEqual(["eng.vmware.com"], dns_suffixes, "suffixes") + + def test_get_nics_list_dhcp(self): + """Tests if NicConfigurator properly calculates network subnets + for a configuration with a list of DHCP NICs""" + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + config = Config(cf) + + nicConfigurator = NicConfigurator(config.nics, False) + nics_cfg_list = nicConfigurator.generate() + + self.assertEqual(2, len(nics_cfg_list), "number of config elements") + + nic1 = {"name": "NIC1"} + nic2 = {"name": "NIC2"} + for cfg in nics_cfg_list: + if cfg.get("name") == nic1.get("name"): + nic1.update(cfg) + elif cfg.get("name") == nic2.get("name"): + nic2.update(cfg) + + self.assertEqual("physical", nic1.get("type"), "type of NIC1") + self.assertEqual("NIC1", nic1.get("name"), "name of NIC1") + self.assertEqual( + "00:50:56:a6:8c:08", nic1.get("mac_address"), "mac address of NIC1" + ) + subnets = nic1.get("subnets") + self.assertEqual(1, len(subnets), "number of subnets for NIC1") + subnet = subnets[0] + self.assertEqual("dhcp", subnet.get("type"), "DHCP type for NIC1") + self.assertEqual("auto", subnet.get("control"), "NIC1 Control type") + + self.assertEqual("physical", nic2.get("type"), "type of NIC2") + self.assertEqual("NIC2", nic2.get("name"), "name of NIC2") + self.assertEqual( + "00:50:56:a6:5a:de", nic2.get("mac_address"), "mac address of NIC2" + ) + subnets = nic2.get("subnets") + self.assertEqual(1, len(subnets), "number of subnets for NIC2") + subnet = subnets[0] + self.assertEqual("dhcp", subnet.get("type"), "DHCP type for NIC2") + self.assertEqual("auto", subnet.get("control"), "NIC2 Control type") + + def test_get_nics_list_static(self): + """Tests if NicConfigurator properly calculates network subnets + for a configuration with 2 static NICs""" + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + config = Config(cf) + + nicConfigurator = NicConfigurator(config.nics, False) + nics_cfg_list = nicConfigurator.generate() + + self.assertEqual(2, len(nics_cfg_list), "number of elements") + + nic1 = {"name": "NIC1"} + nic2 = {"name": "NIC2"} + route_list = [] + for cfg in nics_cfg_list: + cfg_type = cfg.get("type") + if cfg_type == "physical": + if cfg.get("name") == nic1.get("name"): + nic1.update(cfg) + elif cfg.get("name") == nic2.get("name"): + nic2.update(cfg) + + self.assertEqual("physical", nic1.get("type"), "type of NIC1") + self.assertEqual("NIC1", nic1.get("name"), "name of NIC1") + self.assertEqual( + "00:50:56:a6:8c:08", nic1.get("mac_address"), "mac address of NIC1" + ) + + subnets = nic1.get("subnets") + self.assertEqual(2, len(subnets), "Number of subnets") + + static_subnet = [] + static6_subnet = [] + + for subnet in subnets: + subnet_type = subnet.get("type") + if subnet_type == "static": + static_subnet.append(subnet) + elif subnet_type == "static6": + static6_subnet.append(subnet) + else: + self.assertEqual(True, False, "Unknown type") + if "route" in subnet: + for route in subnet.get("routes"): + route_list.append(route) + + self.assertEqual(1, len(static_subnet), "Number of static subnet") + self.assertEqual(1, len(static6_subnet), "Number of static6 subnet") + + subnet = static_subnet[0] + self.assertEqual( + "10.20.87.154", + subnet.get("address"), + "IPv4 address of static subnet", + ) + self.assertEqual( + "255.255.252.0", subnet.get("netmask"), "NetMask of static subnet" + ) + self.assertEqual( + "auto", subnet.get("control"), "control for static subnet" + ) + + subnet = static6_subnet[0] + self.assertEqual( + "fc00:10:20:87::154", + subnet.get("address"), + "IPv6 address of static subnet", + ) + self.assertEqual( + "64", subnet.get("netmask"), "NetMask of static6 subnet" + ) + + route_set = set(["10.20.87.253", "10.20.87.105", "192.168.0.10"]) + for route in route_list: + self.assertEqual(10000, route.get("metric"), "metric of route") + gateway = route.get("gateway") + if gateway in route_set: + route_set.discard(gateway) + else: + self.assertEqual(True, False, "invalid gateway %s" % (gateway)) + + self.assertEqual("physical", nic2.get("type"), "type of NIC2") + self.assertEqual("NIC2", nic2.get("name"), "name of NIC2") + self.assertEqual( + "00:50:56:a6:ef:7d", nic2.get("mac_address"), "mac address of NIC2" + ) + + subnets = nic2.get("subnets") + self.assertEqual(1, len(subnets), "Number of subnets for NIC2") + + subnet = subnets[0] + self.assertEqual("static", subnet.get("type"), "Subnet type") + self.assertEqual( + "192.168.6.102", subnet.get("address"), "Subnet address" + ) + self.assertEqual( + "255.255.0.0", subnet.get("netmask"), "Subnet netmask" + ) + + def test_custom_script(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertIsNone(conf.custom_script_name) + cf._insertKey("CUSTOM-SCRIPT|SCRIPT-NAME", "test-script") + conf = Config(cf) + self.assertEqual("test-script", conf.custom_script_name) + + def test_post_gc_status(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertFalse(conf.post_gc_status) + cf._insertKey("MISC|POST-GC-STATUS", "YES") + conf = Config(cf) + self.assertTrue(conf.post_gc_status) + + def test_no_default_run_post_script(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertFalse(conf.default_run_post_script) + cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "NO") + conf = Config(cf) + self.assertFalse(conf.default_run_post_script) + + def test_yes_default_run_post_script(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "yes") + conf = Config(cf) + self.assertTrue(conf.default_run_post_script) + + +class TestVmwareNetConfig(CiTestCase): + """Test conversion of vmware config to cloud-init config.""" + + maxDiff = None + + def _get_NicConfigurator(self, text): + fp = None + try: + with tempfile.NamedTemporaryFile( + mode="w", dir=self.tmp_dir(), delete=False + ) as fp: + fp.write(text) + fp.close() + cfg = Config(ConfigFile(fp.name)) + return NicConfigurator(cfg.nics, use_system_devices=False) + finally: + if fp: + os.unlink(fp.name) + + def test_non_primary_nic_without_gateway(self): + """A non primary nic set is not required to have a gateway.""" + config = textwrap.dedent( + """\ + [NETWORK] + NETWORKING = yes + BOOTPROTO = dhcp + HOSTNAME = myhost1 + DOMAINNAME = eng.vmware.com + + [NIC-CONFIG] + NICS = NIC1 + + [NIC1] + MACADDR = 00:50:56:a6:8c:08 + ONBOOT = yes + IPv4_MODE = BACKWARDS_COMPATIBLE + BOOTPROTO = static + IPADDR = 10.20.87.154 + NETMASK = 255.255.252.0 + """ + ) + nc = self._get_NicConfigurator(config) + self.assertEqual( + [ + { + "type": "physical", + "name": "NIC1", + "mac_address": "00:50:56:a6:8c:08", + "subnets": [ + { + "control": "auto", + "type": "static", + "address": "10.20.87.154", + "netmask": "255.255.252.0", + } + ], + } + ], + nc.generate(), + ) + + def test_non_primary_nic_with_gateway(self): + """A non primary nic set can have a gateway.""" + config = textwrap.dedent( + """\ + [NETWORK] + NETWORKING = yes + BOOTPROTO = dhcp + HOSTNAME = myhost1 + DOMAINNAME = eng.vmware.com + + [NIC-CONFIG] + NICS = NIC1 + + [NIC1] + MACADDR = 00:50:56:a6:8c:08 + ONBOOT = yes + IPv4_MODE = BACKWARDS_COMPATIBLE + BOOTPROTO = static + IPADDR = 10.20.87.154 + NETMASK = 255.255.252.0 + GATEWAY = 10.20.87.253 + """ + ) + nc = self._get_NicConfigurator(config) + self.assertEqual( + [ + { + "type": "physical", + "name": "NIC1", + "mac_address": "00:50:56:a6:8c:08", + "subnets": [ + { + "control": "auto", + "type": "static", + "address": "10.20.87.154", + "netmask": "255.255.252.0", + "routes": [ + { + "type": "route", + "destination": "10.20.84.0/22", + "gateway": "10.20.87.253", + "metric": 10000, + } + ], + } + ], + } + ], + nc.generate(), + ) + + def test_cust_non_primary_nic_with_gateway_(self): + """A customer non primary nic set can have a gateway.""" + config = textwrap.dedent( + """\ + [NETWORK] + NETWORKING = yes + BOOTPROTO = dhcp + HOSTNAME = static-debug-vm + DOMAINNAME = cluster.local + + [NIC-CONFIG] + NICS = NIC1 + + [NIC1] + MACADDR = 00:50:56:ac:d1:8a + ONBOOT = yes + IPv4_MODE = BACKWARDS_COMPATIBLE + BOOTPROTO = static + IPADDR = 100.115.223.75 + NETMASK = 255.255.255.0 + GATEWAY = 100.115.223.254 + + + [DNS] + DNSFROMDHCP=no + + NAMESERVER|1 = 8.8.8.8 + + [DATETIME] + UTC = yes + """ + ) + nc = self._get_NicConfigurator(config) + self.assertEqual( + [ + { + "type": "physical", + "name": "NIC1", + "mac_address": "00:50:56:ac:d1:8a", + "subnets": [ + { + "control": "auto", + "type": "static", + "address": "100.115.223.75", + "netmask": "255.255.255.0", + "routes": [ + { + "type": "route", + "destination": "100.115.223.0/24", + "gateway": "100.115.223.254", + "metric": 10000, + } + ], + } + ], + } + ], + nc.generate(), + ) + + def test_a_primary_nic_with_gateway(self): + """A primary nic set can have a gateway.""" + config = textwrap.dedent( + """\ + [NETWORK] + NETWORKING = yes + BOOTPROTO = dhcp + HOSTNAME = myhost1 + DOMAINNAME = eng.vmware.com + + [NIC-CONFIG] + NICS = NIC1 + + [NIC1] + MACADDR = 00:50:56:a6:8c:08 + ONBOOT = yes + IPv4_MODE = BACKWARDS_COMPATIBLE + BOOTPROTO = static + IPADDR = 10.20.87.154 + NETMASK = 255.255.252.0 + PRIMARY = true + GATEWAY = 10.20.87.253 + """ + ) + nc = self._get_NicConfigurator(config) + self.assertEqual( + [ + { + "type": "physical", + "name": "NIC1", + "mac_address": "00:50:56:a6:8c:08", + "subnets": [ + { + "control": "auto", + "type": "static", + "address": "10.20.87.154", + "netmask": "255.255.252.0", + "gateway": "10.20.87.253", + } + ], + } + ], + nc.generate(), + ) + + def test_meta_data(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertIsNone(conf.meta_data_name) + cf._insertKey("CLOUDINIT|METADATA", "test-metadata") + conf = Config(cf) + self.assertEqual("test-metadata", conf.meta_data_name) + + def test_user_data(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertIsNone(conf.user_data_name) + cf._insertKey("CLOUDINIT|USERDATA", "test-userdata") + conf = Config(cf) + self.assertEqual("test-userdata", conf.user_data_name) + + +# vi: ts=4 expandtab |