# This file is part of cloud-init. See LICENSE file for license information. import copy import os import shutil from functools import partial from os.path import dirname from cloudinit import helpers, util from cloudinit.config import cc_ntp from tests.unittests.helpers import ( CiTestCase, FilesystemMockingTestCase, mock, skipUnlessJsonSchema, ) from tests.unittests.util import get_cloud NTP_TEMPLATE = """\ ## template: jinja servers {{servers}} pools {{pools}} """ TIMESYNCD_TEMPLATE = """\ ## template:jinja [Time] {% if servers or pools -%} NTP={% for host in servers|list + pools|list %}{{ host }} {% endfor -%} {% endif -%} """ class TestNtp(FilesystemMockingTestCase): with_logs = True def setUp(self): super(TestNtp, self).setUp() self.new_root = self.tmp_dir() self.add_patch("cloudinit.util.system_is_snappy", "m_snappy") self.m_snappy.return_value = False self.new_root = self.reRoot() self._get_cloud = partial( get_cloud, paths=helpers.Paths({"templates_dir": self.new_root}) ) def _get_template_path(self, template_name, distro, basepath=None): # ntp.conf.{distro} -> ntp.conf.debian.tmpl template_fn = "{0}.tmpl".format( template_name.replace("{distro}", distro) ) if not basepath: basepath = self.new_root path = os.path.join(basepath, template_fn) return path def _generate_template(self, template=None): if not template: template = NTP_TEMPLATE confpath = os.path.join(self.new_root, "client.conf") template_fn = os.path.join(self.new_root, "client.conf.tmpl") util.write_file(template_fn, content=template) return (confpath, template_fn) def _mock_ntp_client_config(self, client=None, distro=None): if not client: client = "ntp" if not distro: distro = "ubuntu" dcfg = cc_ntp.distro_ntp_client_configs(distro) if client == "systemd-timesyncd": template = TIMESYNCD_TEMPLATE else: template = NTP_TEMPLATE (confpath, _template_fn) = self._generate_template(template=template) ntpconfig = copy.deepcopy(dcfg[client]) ntpconfig["confpath"] = confpath ntpconfig["template_name"] = os.path.basename(confpath) return ntpconfig @mock.patch("cloudinit.config.cc_ntp.subp") def test_ntp_install(self, mock_subp): """ntp_install_client runs install_func when check_exe is absent.""" mock_subp.which.return_value = None # check_exe not found. install_func = mock.MagicMock() cc_ntp.install_ntp_client( install_func, packages=["ntpx"], check_exe="ntpdx" ) mock_subp.which.assert_called_with("ntpdx") install_func.assert_called_once_with(["ntpx"]) @mock.patch("cloudinit.config.cc_ntp.subp") def test_ntp_install_not_needed(self, mock_subp): """ntp_install_client doesn't install when check_exe is found.""" client = "chrony" mock_subp.which.return_value = [client] # check_exe found. install_func = mock.MagicMock() cc_ntp.install_ntp_client( install_func, packages=[client], check_exe=client ) install_func.assert_not_called() @mock.patch("cloudinit.config.cc_ntp.subp") def test_ntp_install_no_op_with_empty_pkg_list(self, mock_subp): """ntp_install_client runs install_func with empty list""" mock_subp.which.return_value = None # check_exe not found install_func = mock.MagicMock() cc_ntp.install_ntp_client( install_func, packages=[], check_exe="timesyncd" ) install_func.assert_called_once_with([]) def test_ntp_rename_ntp_conf(self): """When NTP_CONF exists, rename_ntp moves it.""" ntpconf = self.tmp_path("ntp.conf", self.new_root) util.write_file(ntpconf, "") cc_ntp.rename_ntp_conf(confpath=ntpconf) self.assertFalse(os.path.exists(ntpconf)) self.assertTrue(os.path.exists("{0}.dist".format(ntpconf))) def test_ntp_rename_ntp_conf_skip_missing(self): """When NTP_CONF doesn't exist rename_ntp doesn't create a file.""" ntpconf = self.tmp_path("ntp.conf", self.new_root) self.assertFalse(os.path.exists(ntpconf)) cc_ntp.rename_ntp_conf(confpath=ntpconf) self.assertFalse(os.path.exists("{0}.dist".format(ntpconf))) self.assertFalse(os.path.exists(ntpconf)) def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers(self): """write_ntp_config_template reads from $client.conf.distro.tmpl""" servers = [] pools = ["10.0.0.1", "10.0.0.2"] (confpath, template_fn) = self._generate_template() mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR" with mock.patch(mock_path, self.new_root): cc_ntp.write_ntp_config_template( "ubuntu", servers=servers, pools=pools, path=confpath, template_fn=template_fn, template=None, ) self.assertEqual( "servers []\npools ['10.0.0.1', '10.0.0.2']\n", util.load_file(confpath), ) def test_write_ntp_config_template_defaults_pools_w_empty_lists(self): """write_ntp_config_template defaults pools servers upon empty config. When both pools and servers are empty, default NR_POOL_SERVERS get configured. """ distro = "ubuntu" pools = cc_ntp.generate_server_names(distro) servers = [] (confpath, template_fn) = self._generate_template() mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR" with mock.patch(mock_path, self.new_root): cc_ntp.write_ntp_config_template( distro, servers=servers, pools=pools, path=confpath, template_fn=template_fn, template=None, ) self.assertEqual( "servers []\npools {0}\n".format(pools), util.load_file(confpath) ) def test_defaults_pools_empty_lists_sles(self): """write_ntp_config_template defaults opensuse pools upon empty config. When both pools and servers are empty, default NR_POOL_SERVERS get configured. """ distro = "sles" default_pools = cc_ntp.generate_server_names(distro) (confpath, template_fn) = self._generate_template() cc_ntp.write_ntp_config_template( distro, servers=[], pools=[], path=confpath, template_fn=template_fn, template=None, ) for pool in default_pools: self.assertIn("opensuse", pool) self.assertEqual( "servers []\npools {0}\n".format(default_pools), util.load_file(confpath), ) self.assertIn( "Adding distro default ntp pool servers: {0}".format( ",".join(default_pools) ), self.logs.getvalue(), ) def test_timesyncd_template(self): """Test timesycnd template is correct""" pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"] servers = ["192.168.23.3", "192.168.23.4"] (confpath, template_fn) = self._generate_template( template=TIMESYNCD_TEMPLATE ) cc_ntp.write_ntp_config_template( "ubuntu", servers=servers, pools=pools, path=confpath, template_fn=template_fn, template=None, ) self.assertEqual( "[Time]\nNTP=%s %s \n" % (" ".join(servers), " ".join(pools)), util.load_file(confpath), ) def test_distro_ntp_client_configs(self): """Test we have updated ntp client configs on different distros""" delta = copy.deepcopy(cc_ntp.DISTRO_CLIENT_CONFIG) base = copy.deepcopy(cc_ntp.NTP_CLIENT_CONFIG) # confirm no-delta distros match the base config for distro in cc_ntp.distros: if distro not in delta: result = cc_ntp.distro_ntp_client_configs(distro) self.assertEqual(base, result) # for distros with delta, ensure the merged config values match # what is set in the delta for distro in delta.keys(): result = cc_ntp.distro_ntp_client_configs(distro) for client in delta[distro].keys(): for key in delta[distro][client].keys(): self.assertEqual( delta[distro][client][key], result[client][key] ) def _get_expected_pools(self, pools, distro, client): if client in ["ntp", "chrony"]: if client == "ntp" and distro == "alpine": # NTP for Alpine Linux is Busybox's ntp which does not # support 'pool' lines in its configuration file. expected_pools = [] else: expected_pools = [ "pool {0} iburst".format(pool) for pool in pools ] elif client == "systemd-timesyncd": expected_pools = " ".join(pools) return expected_pools def _get_expected_servers(self, servers, distro, client): if client in ["ntp", "chrony"]: if client == "ntp" and distro == "alpine": # NTP for Alpine Linux is Busybox's ntp which only supports # 'server' lines without iburst option. expected_servers = [ "server {0}".format(srv) for srv in servers ] else: expected_servers = [ "server {0} iburst".format(srv) for srv in servers ] elif client == "systemd-timesyncd": expected_servers = " ".join(servers) return expected_servers def test_ntp_handler_real_distro_ntp_templates(self): """Test ntp handler renders the shipped distro ntp client templates.""" pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"] servers = ["192.168.23.3", "192.168.23.4"] for client in ["ntp", "systemd-timesyncd", "chrony"]: for distro in cc_ntp.distros: distro_cfg = cc_ntp.distro_ntp_client_configs(distro) ntpclient = distro_cfg[client] confpath = os.path.join( self.new_root, ntpclient.get("confpath")[1:] ) template = ntpclient.get("template_name") # find sourcetree template file root_dir = ( dirname(dirname(os.path.realpath(util.__file__))) + "/templates" ) source_fn = self._get_template_path( template, distro, basepath=root_dir ) template_fn = self._get_template_path(template, distro) # don't fail if cloud-init doesn't have a template for # a distro,client pair if not os.path.exists(source_fn): continue # Create a copy in our tmp_dir shutil.copy(source_fn, template_fn) cc_ntp.write_ntp_config_template( distro, servers=servers, pools=pools, path=confpath, template_fn=template_fn, ) content = util.load_file(confpath) if client in ["ntp", "chrony"]: content_lines = content.splitlines() expected_servers = self._get_expected_servers( servers, distro, client ) print("distro=%s client=%s" % (distro, client)) for sline in expected_servers: self.assertIn( sline, content_lines, "failed to render {0} conf for distro:{1}".format( client, distro ), ) expected_pools = self._get_expected_pools( pools, distro, client ) if expected_pools != []: for pline in expected_pools: self.assertIn( pline, content_lines, "failed to render {0} conf" " for distro:{1}".format(client, distro), ) elif client == "systemd-timesyncd": expected_servers = self._get_expected_servers( servers, distro, client ) expected_pools = self._get_expected_pools( pools, distro, client ) expected_content = ( "# cloud-init generated file\n" + "# See timesyncd.conf(5) for details.\n\n" + "[Time]\nNTP=%s %s \n" % (expected_servers, expected_pools) ) self.assertEqual(expected_content, content) def test_no_ntpcfg_does_nothing(self): """When no ntp section is defined handler logs a warning and noops.""" cc_ntp.handle("cc_ntp", {}, None, None, []) self.assertEqual( "DEBUG: Skipping module named cc_ntp, " "not present or disabled by cfg\n", self.logs.getvalue(), ) @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_allows_empty_ntp_config( self, m_select ): """Ntp schema validation allows for an empty ntp: configuration.""" valid_empty_configs = [{"ntp": {}}, {"ntp": None}] for valid_empty_config in valid_empty_configs: for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig cc_ntp.handle("cc_ntp", valid_empty_config, mycloud, None, []) if distro == "alpine": # _mock_ntp_client_config call above did not specify a # client value and so it defaults to "ntp" which on # Alpine Linux only supports servers and not pools. servers = cc_ntp.generate_server_names(mycloud.distro.name) self.assertEqual( "servers {0}\npools []\n".format(servers), util.load_file(confpath), ) else: pools = cc_ntp.generate_server_names(mycloud.distro.name) self.assertEqual( "servers []\npools {0}\n".format(pools), util.load_file(confpath), ) self.assertNotIn( "Invalid cloud-config provided:", self.logs.getvalue() ) @skipUnlessJsonSchema() @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_warns_non_string_item_type( self, m_sel ): """Ntp schema validation warns of non-strings in pools or servers. Schema validation is not strict, so ntp config is still be rendered. """ invalid_config = {"ntp": {"pools": [123], "servers": ["valid", None]}} for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] m_sel.return_value = ntpconfig cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, []) self.assertIn( "Invalid cloud-config provided:\nntp.pools.0: 123 is not of" " type 'string'\nntp.servers.1: None is not of type 'string'", self.logs.getvalue(), ) self.assertEqual( "servers ['valid', None]\npools [123]\n", util.load_file(confpath), ) @skipUnlessJsonSchema() @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_warns_of_non_array_type( self, m_select ): """Ntp schema validation warns of non-array pools or servers types. Schema validation is not strict, so ntp config is still be rendered. """ invalid_config = {"ntp": {"pools": 123, "servers": "non-array"}} for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, []) self.assertIn( "Invalid cloud-config provided:\nntp.pools: 123 is not of type" " 'array'\nntp.servers: 'non-array' is not of type 'array'", self.logs.getvalue(), ) self.assertEqual( "servers non-array\npools 123\n", util.load_file(confpath) ) @skipUnlessJsonSchema() @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_warns_invalid_key_present( self, m_select ): """Ntp schema validation warns of invalid keys present in ntp config. Schema validation is not strict, so ntp config is still be rendered. """ invalid_config = { "ntp": {"invalidkey": 1, "pools": ["0.mycompany.pool.ntp.org"]} } for distro in cc_ntp.distros: if distro != "alpine": mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, []) self.assertIn( "Invalid cloud-config provided:\nntp: Additional" " properties are not allowed ('invalidkey' was" " unexpected)", self.logs.getvalue(), ) self.assertEqual( "servers []\npools ['0.mycompany.pool.ntp.org']\n", util.load_file(confpath), ) @skipUnlessJsonSchema() @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_schema_validation_warns_of_duplicates(self, m_select): """Ntp schema validation warns of duplicates in servers or pools. Schema validation is not strict, so ntp config is still be rendered. """ invalid_config = { "ntp": { "pools": ["0.mypool.org", "0.mypool.org"], "servers": ["10.0.0.1", "10.0.0.1"], } } for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, []) self.assertIn( "Invalid cloud-config provided:\nntp.pools: ['0.mypool.org'," " '0.mypool.org'] has non-unique elements\nntp.servers: " "['10.0.0.1', '10.0.0.1'] has non-unique elements", self.logs.getvalue(), ) self.assertEqual( "servers ['10.0.0.1', '10.0.0.1']\n" "pools ['0.mypool.org', '0.mypool.org']\n", util.load_file(confpath), ) @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_timesyncd(self, m_select): """Test ntp handler configures timesyncd""" servers = ["192.168.2.1", "192.168.2.2"] pools = ["0.mypool.org"] cfg = {"ntp": {"servers": servers, "pools": pools}} client = "systemd-timesyncd" for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config( distro=distro, client=client ) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig cc_ntp.handle("cc_ntp", cfg, mycloud, None, []) self.assertEqual( "[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n", util.load_file(confpath), ) @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_handler_enabled_false(self, m_select): """Test ntp handler does not run if enabled: false""" cfg = {"ntp": {"enabled": False}} for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) cc_ntp.handle("notimportant", cfg, mycloud, None, None) self.assertEqual(0, m_select.call_count) @mock.patch("cloudinit.distros.subp") @mock.patch("cloudinit.config.cc_ntp.subp") @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") @mock.patch("cloudinit.distros.Distro.uses_systemd") def test_ntp_the_whole_package(self, m_sysd, m_select, m_subp, m_dsubp): """Test enabled config renders template, and restarts service""" cfg = {"ntp": {"enabled": True}} for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config(distro=distro) confpath = ntpconfig["confpath"] service_name = ntpconfig["service_name"] m_select.return_value = ntpconfig hosts = cc_ntp.generate_server_names(mycloud.distro.name) uses_systemd = True expected_service_call = [ "systemctl", "reload-or-restart", service_name, ] expected_content = "servers []\npools {0}\n".format(hosts) if distro == "alpine": uses_systemd = False expected_service_call = ["rc-service", service_name, "restart"] # _mock_ntp_client_config call above did not specify a client # value and so it defaults to "ntp" which on Alpine Linux only # supports servers and not pools. expected_content = "servers {0}\npools []\n".format(hosts) m_sysd.return_value = uses_systemd with mock.patch("cloudinit.config.cc_ntp.util") as m_util: # allow use of util.mergemanydict m_util.mergemanydict.side_effect = util.mergemanydict # default client is present m_subp.which.return_value = True # use the config 'enabled' value m_util.is_false.return_value = util.is_false( cfg["ntp"]["enabled"] ) cc_ntp.handle("notimportant", cfg, mycloud, None, None) m_dsubp.subp.assert_called_with( expected_service_call, capture=True ) self.assertEqual(expected_content, util.load_file(confpath)) @mock.patch("cloudinit.util.system_info") def test_opensuse_picks_chrony(self, m_sysinfo): """Test opensuse picks chrony or ntp on certain distro versions""" # < 15.0 => ntp m_sysinfo.return_value = {"dist": ("openSUSE", "13.2", "Harlequin")} mycloud = self._get_cloud("opensuse") expected_client = mycloud.distro.preferred_ntp_clients[0] self.assertEqual("ntp", expected_client) # >= 15.0 and not openSUSE => chrony m_sysinfo.return_value = { "dist": ("SLES", "15.0", "SUSE Linux Enterprise Server 15") } mycloud = self._get_cloud("sles") expected_client = mycloud.distro.preferred_ntp_clients[0] self.assertEqual("chrony", expected_client) # >= 15.0 and openSUSE and ver != 42 => chrony m_sysinfo.return_value = { "dist": ("openSUSE Tumbleweed", "20180326", "timbleweed") } mycloud = self._get_cloud("opensuse") expected_client = mycloud.distro.preferred_ntp_clients[0] self.assertEqual("chrony", expected_client) @mock.patch("cloudinit.util.system_info") def test_ubuntu_xenial_picks_ntp(self, m_sysinfo): """Test Ubuntu picks ntp on xenial release""" m_sysinfo.return_value = {"dist": ("Ubuntu", "16.04", "xenial")} mycloud = self._get_cloud("ubuntu") expected_client = mycloud.distro.preferred_ntp_clients[0] self.assertEqual("ntp", expected_client) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_snappy_system_picks_timesyncd(self, m_which): """Test snappy systems prefer installed clients""" # we are on ubuntu-core here self.m_snappy.return_value = True # ubuntu core systems will have timesyncd installed m_which.side_effect = iter( [None, "/lib/systemd/systemd-timesyncd", None, None, None] ) distro = "ubuntu" mycloud = self._get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = "systemd-timesyncd" expected_cfg = distro_configs[expected_client] expected_calls = [] # we only get to timesyncd for client in mycloud.distro.preferred_ntp_clients[0:2]: cfg = distro_configs[client] expected_calls.append(mock.call(cfg["check_exe"])) result = cc_ntp.select_ntp_client(None, mycloud.distro) m_which.assert_has_calls(expected_calls) self.assertEqual(sorted(expected_cfg), sorted(cfg)) self.assertEqual(sorted(expected_cfg), sorted(result)) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_ntp_distro_searches_all_preferred_clients(self, m_which): """Test select_ntp_client search all distro perferred clients""" # nothing is installed m_which.return_value = None for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = mycloud.distro.preferred_ntp_clients[0] expected_cfg = distro_configs[expected_client] expected_calls = [] for client in mycloud.distro.preferred_ntp_clients: cfg = distro_configs[client] expected_calls.append(mock.call(cfg["check_exe"])) cc_ntp.select_ntp_client({}, mycloud.distro) m_which.assert_has_calls(expected_calls) self.assertEqual(sorted(expected_cfg), sorted(cfg)) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which): """Test user_cfg.ntp_client='auto' defaults to distro search""" # nothing is installed m_which.return_value = None for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_client = mycloud.distro.preferred_ntp_clients[0] expected_cfg = distro_configs[expected_client] expected_calls = [] for client in mycloud.distro.preferred_ntp_clients: cfg = distro_configs[client] expected_calls.append(mock.call(cfg["check_exe"])) cc_ntp.select_ntp_client("auto", mycloud.distro) m_which.assert_has_calls(expected_calls) self.assertEqual(sorted(expected_cfg), sorted(cfg)) @mock.patch("cloudinit.config.cc_ntp.write_ntp_config_template") @mock.patch("cloudinit.cloud.Cloud.get_template_filename") @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_ntp_custom_client_overrides_installed_clients( self, m_which, m_tmpfn, m_write ): """Test user client is installed despite other clients present""" client = "ntpdate" cfg = {"ntp": {"ntp_client": client}} for distro in cc_ntp.distros: # client is not installed m_which.side_effect = iter([None]) mycloud = self._get_cloud(distro) with mock.patch.object( mycloud.distro, "install_packages" ) as m_install: cc_ntp.handle("notimportant", cfg, mycloud, None, None) m_install.assert_called_with([client]) m_which.assert_called_with(client) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_ntp_system_config_overrides_distro_builtin_clients(self, m_which): """Test distro system_config overrides builtin preferred ntp clients""" system_client = "chrony" sys_cfg = {"ntp_client": system_client} # no clients installed m_which.return_value = None for distro in cc_ntp.distros: mycloud = self._get_cloud(distro, sys_cfg=sys_cfg) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_cfg = distro_configs[system_client] result = cc_ntp.select_ntp_client(None, mycloud.distro) self.assertEqual(sorted(expected_cfg), sorted(result)) m_which.assert_has_calls([]) @mock.patch("cloudinit.config.cc_ntp.subp.which") def test_ntp_user_config_overrides_system_cfg(self, m_which): """Test user-data overrides system_config ntp_client""" system_client = "chrony" sys_cfg = {"ntp_client": system_client} user_client = "systemd-timesyncd" # no clients installed m_which.return_value = None for distro in cc_ntp.distros: mycloud = self._get_cloud(distro, sys_cfg=sys_cfg) distro_configs = cc_ntp.distro_ntp_client_configs(distro) expected_cfg = distro_configs[user_client] result = cc_ntp.select_ntp_client(user_client, mycloud.distro) self.assertEqual(sorted(expected_cfg), sorted(result)) m_which.assert_has_calls([]) @mock.patch("cloudinit.config.cc_ntp.install_ntp_client") def test_ntp_user_provided_config_with_template(self, m_install): custom = r"\n#MyCustomTemplate" user_template = NTP_TEMPLATE + custom confpath = os.path.join(self.new_root, "etc/myntp/myntp.conf") cfg = { "ntp": { "pools": ["mypool.org"], "ntp_client": "myntpd", "config": { "check_exe": "myntpd", "confpath": confpath, "packages": ["myntp"], "service_name": "myntp", "template": user_template, }, } } for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR" with mock.patch(mock_path, self.new_root): cc_ntp.handle("notimportant", cfg, mycloud, None, None) self.assertEqual( "servers []\npools ['mypool.org']\n%s" % custom, util.load_file(confpath), ) @mock.patch("cloudinit.config.cc_ntp.supplemental_schema_validation") @mock.patch("cloudinit.config.cc_ntp.install_ntp_client") @mock.patch("cloudinit.config.cc_ntp.select_ntp_client") def test_ntp_user_provided_config_template_only( self, m_select, m_install, m_schema ): """Test custom template for default client""" custom = r"\n#MyCustomTemplate" user_template = NTP_TEMPLATE + custom client = "chrony" cfg = { "pools": ["mypool.org"], "ntp_client": client, "config": { "template": user_template, }, } expected_merged_cfg = { "check_exe": "chronyd", "confpath": "{tmpdir}/client.conf".format(tmpdir=self.new_root), "template_name": "client.conf", "template": user_template, "service_name": "chrony", "packages": ["chrony"], } for distro in cc_ntp.distros: mycloud = self._get_cloud(distro) ntpconfig = self._mock_ntp_client_config( client=client, distro=distro ) confpath = ntpconfig["confpath"] m_select.return_value = ntpconfig mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR" with mock.patch(mock_path, self.new_root): cc_ntp.handle( "notimportant", {"ntp": cfg}, mycloud, None, None ) self.assertEqual( "servers []\npools ['mypool.org']\n%s" % custom, util.load_file(confpath), ) m_schema.assert_called_with(expected_merged_cfg) class TestSupplementalSchemaValidation(CiTestCase): def test_error_on_missing_keys(self): """ValueError raised reporting any missing required ntp:config keys""" cfg = {} match = ( r"Invalid ntp configuration:\\nMissing required ntp:config" " keys: check_exe, confpath, packages, service_name" ) with self.assertRaisesRegex(ValueError, match): cc_ntp.supplemental_schema_validation(cfg) def test_error_requiring_either_template_or_template_name(self): """ValueError raised if both template not template_name are None.""" cfg = { "confpath": "someconf", "check_exe": "", "service_name": "", "template": None, "template_name": None, "packages": [], } match = ( r"Invalid ntp configuration:\\nEither ntp:config:template" " or ntp:config:template_name values are required" ) with self.assertRaisesRegex(ValueError, match): cc_ntp.supplemental_schema_validation(cfg) def test_error_on_non_list_values(self): """ValueError raised when packages is not of type list.""" cfg = { "confpath": "someconf", "check_exe": "", "service_name": "", "template": "asdf", "template_name": None, "packages": "NOPE", } match = ( r"Invalid ntp configuration:\\nExpected a list of required" " package names for ntp:config:packages. Found \\(NOPE\\)" ) with self.assertRaisesRegex(ValueError, match): cc_ntp.supplemental_schema_validation(cfg) def test_error_on_non_string_values(self): """ValueError raised for any values expected as string type.""" cfg = { "confpath": 1, "check_exe": 2, "service_name": 3, "template": 4, "template_name": 5, "packages": [], } errors = [ "Expected a config file path ntp:config:confpath. Found (1)", "Expected a string type for ntp:config:check_exe. Found (2)", "Expected a string type for ntp:config:service_name. Found (3)", "Expected a string type for ntp:config:template. Found (4)", "Expected a string type for ntp:config:template_name. Found (5)", ] with self.assertRaises(ValueError) as context_mgr: cc_ntp.supplemental_schema_validation(cfg) error_msg = str(context_mgr.exception) for error in errors: self.assertIn(error, error_msg) # vi: ts=4 expandtab