summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_ntp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_ntp.py')
-rw-r--r--tests/unittests/config/test_cc_ntp.py870
1 files changed, 870 insertions, 0 deletions
diff --git a/tests/unittests/config/test_cc_ntp.py b/tests/unittests/config/test_cc_ntp.py
new file mode 100644
index 00000000..fba141aa
--- /dev/null
+++ b/tests/unittests/config/test_cc_ntp.py
@@ -0,0 +1,870 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+import copy
+import os
+import shutil
+from functools import partial
+from os.path import dirname
+
+from cloudinit import helpers, util
+from cloudinit.config import cc_ntp
+from tests.unittests.helpers import (
+ CiTestCase,
+ FilesystemMockingTestCase,
+ mock,
+ skipUnlessJsonSchema,
+)
+from tests.unittests.util import get_cloud
+
+NTP_TEMPLATE = """\
+## template: jinja
+servers {{servers}}
+pools {{pools}}
+"""
+
+TIMESYNCD_TEMPLATE = """\
+## template:jinja
+[Time]
+{% if servers or pools -%}
+NTP={% for host in servers|list + pools|list %}{{ host }} {% endfor -%}
+{% endif -%}
+"""
+
+
+class TestNtp(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestNtp, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.add_patch("cloudinit.util.system_is_snappy", "m_snappy")
+ self.m_snappy.return_value = False
+ self.new_root = self.reRoot()
+ self._get_cloud = partial(
+ get_cloud, paths=helpers.Paths({"templates_dir": self.new_root})
+ )
+
+ def _get_template_path(self, template_name, distro, basepath=None):
+ # ntp.conf.{distro} -> ntp.conf.debian.tmpl
+ template_fn = "{0}.tmpl".format(
+ template_name.replace("{distro}", distro)
+ )
+ if not basepath:
+ basepath = self.new_root
+ path = os.path.join(basepath, template_fn)
+ return path
+
+ def _generate_template(self, template=None):
+ if not template:
+ template = NTP_TEMPLATE
+ confpath = os.path.join(self.new_root, "client.conf")
+ template_fn = os.path.join(self.new_root, "client.conf.tmpl")
+ util.write_file(template_fn, content=template)
+ return (confpath, template_fn)
+
+ def _mock_ntp_client_config(self, client=None, distro=None):
+ if not client:
+ client = "ntp"
+ if not distro:
+ distro = "ubuntu"
+ dcfg = cc_ntp.distro_ntp_client_configs(distro)
+ if client == "systemd-timesyncd":
+ template = TIMESYNCD_TEMPLATE
+ else:
+ template = NTP_TEMPLATE
+ (confpath, _template_fn) = self._generate_template(template=template)
+ ntpconfig = copy.deepcopy(dcfg[client])
+ ntpconfig["confpath"] = confpath
+ ntpconfig["template_name"] = os.path.basename(confpath)
+ return ntpconfig
+
+ @mock.patch("cloudinit.config.cc_ntp.subp")
+ def test_ntp_install(self, mock_subp):
+ """ntp_install_client runs install_func when check_exe is absent."""
+ mock_subp.which.return_value = None # check_exe not found.
+ install_func = mock.MagicMock()
+ cc_ntp.install_ntp_client(
+ install_func, packages=["ntpx"], check_exe="ntpdx"
+ )
+ mock_subp.which.assert_called_with("ntpdx")
+ install_func.assert_called_once_with(["ntpx"])
+
+ @mock.patch("cloudinit.config.cc_ntp.subp")
+ def test_ntp_install_not_needed(self, mock_subp):
+ """ntp_install_client doesn't install when check_exe is found."""
+ client = "chrony"
+ mock_subp.which.return_value = [client] # check_exe found.
+ install_func = mock.MagicMock()
+ cc_ntp.install_ntp_client(
+ install_func, packages=[client], check_exe=client
+ )
+ install_func.assert_not_called()
+
+ @mock.patch("cloudinit.config.cc_ntp.subp")
+ def test_ntp_install_no_op_with_empty_pkg_list(self, mock_subp):
+ """ntp_install_client runs install_func with empty list"""
+ mock_subp.which.return_value = None # check_exe not found
+ install_func = mock.MagicMock()
+ cc_ntp.install_ntp_client(
+ install_func, packages=[], check_exe="timesyncd"
+ )
+ install_func.assert_called_once_with([])
+
+ def test_ntp_rename_ntp_conf(self):
+ """When NTP_CONF exists, rename_ntp moves it."""
+ ntpconf = self.tmp_path("ntp.conf", self.new_root)
+ util.write_file(ntpconf, "")
+ cc_ntp.rename_ntp_conf(confpath=ntpconf)
+ self.assertFalse(os.path.exists(ntpconf))
+ self.assertTrue(os.path.exists("{0}.dist".format(ntpconf)))
+
+ def test_ntp_rename_ntp_conf_skip_missing(self):
+ """When NTP_CONF doesn't exist rename_ntp doesn't create a file."""
+ ntpconf = self.tmp_path("ntp.conf", self.new_root)
+ self.assertFalse(os.path.exists(ntpconf))
+ cc_ntp.rename_ntp_conf(confpath=ntpconf)
+ self.assertFalse(os.path.exists("{0}.dist".format(ntpconf)))
+ self.assertFalse(os.path.exists(ntpconf))
+
+ def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers(self):
+ """write_ntp_config_template reads from $client.conf.distro.tmpl"""
+ servers = []
+ pools = ["10.0.0.1", "10.0.0.2"]
+ (confpath, template_fn) = self._generate_template()
+ mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR"
+ with mock.patch(mock_path, self.new_root):
+ cc_ntp.write_ntp_config_template(
+ "ubuntu",
+ servers=servers,
+ pools=pools,
+ path=confpath,
+ template_fn=template_fn,
+ template=None,
+ )
+ self.assertEqual(
+ "servers []\npools ['10.0.0.1', '10.0.0.2']\n",
+ util.load_file(confpath),
+ )
+
+ def test_write_ntp_config_template_defaults_pools_w_empty_lists(self):
+ """write_ntp_config_template defaults pools servers upon empty config.
+
+ When both pools and servers are empty, default NR_POOL_SERVERS get
+ configured.
+ """
+ distro = "ubuntu"
+ pools = cc_ntp.generate_server_names(distro)
+ servers = []
+ (confpath, template_fn) = self._generate_template()
+ mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR"
+ with mock.patch(mock_path, self.new_root):
+ cc_ntp.write_ntp_config_template(
+ distro,
+ servers=servers,
+ pools=pools,
+ path=confpath,
+ template_fn=template_fn,
+ template=None,
+ )
+ self.assertEqual(
+ "servers []\npools {0}\n".format(pools), util.load_file(confpath)
+ )
+
+ def test_defaults_pools_empty_lists_sles(self):
+ """write_ntp_config_template defaults opensuse pools upon empty config.
+
+ When both pools and servers are empty, default NR_POOL_SERVERS get
+ configured.
+ """
+ distro = "sles"
+ default_pools = cc_ntp.generate_server_names(distro)
+ (confpath, template_fn) = self._generate_template()
+
+ cc_ntp.write_ntp_config_template(
+ distro,
+ servers=[],
+ pools=[],
+ path=confpath,
+ template_fn=template_fn,
+ template=None,
+ )
+ for pool in default_pools:
+ self.assertIn("opensuse", pool)
+ self.assertEqual(
+ "servers []\npools {0}\n".format(default_pools),
+ util.load_file(confpath),
+ )
+ self.assertIn(
+ "Adding distro default ntp pool servers: {0}".format(
+ ",".join(default_pools)
+ ),
+ self.logs.getvalue(),
+ )
+
+ def test_timesyncd_template(self):
+ """Test timesycnd template is correct"""
+ pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"]
+ servers = ["192.168.23.3", "192.168.23.4"]
+ (confpath, template_fn) = self._generate_template(
+ template=TIMESYNCD_TEMPLATE
+ )
+ cc_ntp.write_ntp_config_template(
+ "ubuntu",
+ servers=servers,
+ pools=pools,
+ path=confpath,
+ template_fn=template_fn,
+ template=None,
+ )
+ self.assertEqual(
+ "[Time]\nNTP=%s %s \n" % (" ".join(servers), " ".join(pools)),
+ util.load_file(confpath),
+ )
+
+ def test_distro_ntp_client_configs(self):
+ """Test we have updated ntp client configs on different distros"""
+ delta = copy.deepcopy(cc_ntp.DISTRO_CLIENT_CONFIG)
+ base = copy.deepcopy(cc_ntp.NTP_CLIENT_CONFIG)
+ # confirm no-delta distros match the base config
+ for distro in cc_ntp.distros:
+ if distro not in delta:
+ result = cc_ntp.distro_ntp_client_configs(distro)
+ self.assertEqual(base, result)
+ # for distros with delta, ensure the merged config values match
+ # what is set in the delta
+ for distro in delta.keys():
+ result = cc_ntp.distro_ntp_client_configs(distro)
+ for client in delta[distro].keys():
+ for key in delta[distro][client].keys():
+ self.assertEqual(
+ delta[distro][client][key], result[client][key]
+ )
+
+ def _get_expected_pools(self, pools, distro, client):
+ if client in ["ntp", "chrony"]:
+ if client == "ntp" and distro == "alpine":
+ # NTP for Alpine Linux is Busybox's ntp which does not
+ # support 'pool' lines in its configuration file.
+ expected_pools = []
+ else:
+ expected_pools = [
+ "pool {0} iburst".format(pool) for pool in pools
+ ]
+ elif client == "systemd-timesyncd":
+ expected_pools = " ".join(pools)
+
+ return expected_pools
+
+ def _get_expected_servers(self, servers, distro, client):
+ if client in ["ntp", "chrony"]:
+ if client == "ntp" and distro == "alpine":
+ # NTP for Alpine Linux is Busybox's ntp which only supports
+ # 'server' lines without iburst option.
+ expected_servers = [
+ "server {0}".format(srv) for srv in servers
+ ]
+ else:
+ expected_servers = [
+ "server {0} iburst".format(srv) for srv in servers
+ ]
+ elif client == "systemd-timesyncd":
+ expected_servers = " ".join(servers)
+
+ return expected_servers
+
+ def test_ntp_handler_real_distro_ntp_templates(self):
+ """Test ntp handler renders the shipped distro ntp client templates."""
+ pools = ["0.mycompany.pool.ntp.org", "3.mycompany.pool.ntp.org"]
+ servers = ["192.168.23.3", "192.168.23.4"]
+ for client in ["ntp", "systemd-timesyncd", "chrony"]:
+ for distro in cc_ntp.distros:
+ distro_cfg = cc_ntp.distro_ntp_client_configs(distro)
+ ntpclient = distro_cfg[client]
+ confpath = os.path.join(
+ self.new_root, ntpclient.get("confpath")[1:]
+ )
+ template = ntpclient.get("template_name")
+ # find sourcetree template file
+ root_dir = (
+ dirname(dirname(os.path.realpath(util.__file__)))
+ + "/templates"
+ )
+ source_fn = self._get_template_path(
+ template, distro, basepath=root_dir
+ )
+ template_fn = self._get_template_path(template, distro)
+ # don't fail if cloud-init doesn't have a template for
+ # a distro,client pair
+ if not os.path.exists(source_fn):
+ continue
+ # Create a copy in our tmp_dir
+ shutil.copy(source_fn, template_fn)
+ cc_ntp.write_ntp_config_template(
+ distro,
+ servers=servers,
+ pools=pools,
+ path=confpath,
+ template_fn=template_fn,
+ )
+ content = util.load_file(confpath)
+ if client in ["ntp", "chrony"]:
+ content_lines = content.splitlines()
+ expected_servers = self._get_expected_servers(
+ servers, distro, client
+ )
+ print("distro=%s client=%s" % (distro, client))
+ for sline in expected_servers:
+ self.assertIn(
+ sline,
+ content_lines,
+ "failed to render {0} conf for distro:{1}".format(
+ client, distro
+ ),
+ )
+ expected_pools = self._get_expected_pools(
+ pools, distro, client
+ )
+ if expected_pools != []:
+ for pline in expected_pools:
+ self.assertIn(
+ pline,
+ content_lines,
+ "failed to render {0} conf"
+ " for distro:{1}".format(client, distro),
+ )
+ elif client == "systemd-timesyncd":
+ expected_servers = self._get_expected_servers(
+ servers, distro, client
+ )
+ expected_pools = self._get_expected_pools(
+ pools, distro, client
+ )
+ expected_content = (
+ "# cloud-init generated file\n"
+ + "# See timesyncd.conf(5) for details.\n\n"
+ + "[Time]\nNTP=%s %s \n"
+ % (expected_servers, expected_pools)
+ )
+ self.assertEqual(expected_content, content)
+
+ def test_no_ntpcfg_does_nothing(self):
+ """When no ntp section is defined handler logs a warning and noops."""
+ cc_ntp.handle("cc_ntp", {}, None, None, [])
+ self.assertEqual(
+ "DEBUG: Skipping module named cc_ntp, "
+ "not present or disabled by cfg\n",
+ self.logs.getvalue(),
+ )
+
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_schema_validation_allows_empty_ntp_config(
+ self, m_select
+ ):
+ """Ntp schema validation allows for an empty ntp: configuration."""
+ valid_empty_configs = [{"ntp": {}}, {"ntp": None}]
+ for valid_empty_config in valid_empty_configs:
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", valid_empty_config, mycloud, None, [])
+ if distro == "alpine":
+ # _mock_ntp_client_config call above did not specify a
+ # client value and so it defaults to "ntp" which on
+ # Alpine Linux only supports servers and not pools.
+
+ servers = cc_ntp.generate_server_names(mycloud.distro.name)
+ self.assertEqual(
+ "servers {0}\npools []\n".format(servers),
+ util.load_file(confpath),
+ )
+ else:
+ pools = cc_ntp.generate_server_names(mycloud.distro.name)
+ self.assertEqual(
+ "servers []\npools {0}\n".format(pools),
+ util.load_file(confpath),
+ )
+ self.assertNotIn(
+ "Invalid cloud-config provided:", self.logs.getvalue()
+ )
+
+ @skipUnlessJsonSchema()
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_schema_validation_warns_non_string_item_type(
+ self, m_sel
+ ):
+ """Ntp schema validation warns of non-strings in pools or servers.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {"ntp": {"pools": [123], "servers": ["valid", None]}}
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ m_sel.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nntp.pools.0: 123 is not of"
+ " type 'string'\nntp.servers.1: None is not of type 'string'",
+ self.logs.getvalue(),
+ )
+ self.assertEqual(
+ "servers ['valid', None]\npools [123]\n",
+ util.load_file(confpath),
+ )
+
+ @skipUnlessJsonSchema()
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_schema_validation_warns_of_non_array_type(
+ self, m_select
+ ):
+ """Ntp schema validation warns of non-array pools or servers types.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {"ntp": {"pools": 123, "servers": "non-array"}}
+
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nntp.pools: 123 is not of type"
+ " 'array'\nntp.servers: 'non-array' is not of type 'array'",
+ self.logs.getvalue(),
+ )
+ self.assertEqual(
+ "servers non-array\npools 123\n", util.load_file(confpath)
+ )
+
+ @skipUnlessJsonSchema()
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_schema_validation_warns_invalid_key_present(
+ self, m_select
+ ):
+ """Ntp schema validation warns of invalid keys present in ntp config.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {
+ "ntp": {"invalidkey": 1, "pools": ["0.mycompany.pool.ntp.org"]}
+ }
+ for distro in cc_ntp.distros:
+ if distro != "alpine":
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nntp: Additional"
+ " properties are not allowed ('invalidkey' was"
+ " unexpected)",
+ self.logs.getvalue(),
+ )
+ self.assertEqual(
+ "servers []\npools ['0.mycompany.pool.ntp.org']\n",
+ util.load_file(confpath),
+ )
+
+ @skipUnlessJsonSchema()
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_schema_validation_warns_of_duplicates(self, m_select):
+ """Ntp schema validation warns of duplicates in servers or pools.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {
+ "ntp": {
+ "pools": ["0.mypool.org", "0.mypool.org"],
+ "servers": ["10.0.0.1", "10.0.0.1"],
+ }
+ }
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", invalid_config, mycloud, None, [])
+ self.assertIn(
+ "Invalid cloud-config provided:\nntp.pools: ['0.mypool.org',"
+ " '0.mypool.org'] has non-unique elements\nntp.servers: "
+ "['10.0.0.1', '10.0.0.1'] has non-unique elements",
+ self.logs.getvalue(),
+ )
+ self.assertEqual(
+ "servers ['10.0.0.1', '10.0.0.1']\n"
+ "pools ['0.mypool.org', '0.mypool.org']\n",
+ util.load_file(confpath),
+ )
+
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_timesyncd(self, m_select):
+ """Test ntp handler configures timesyncd"""
+ servers = ["192.168.2.1", "192.168.2.2"]
+ pools = ["0.mypool.org"]
+ cfg = {"ntp": {"servers": servers, "pools": pools}}
+ client = "systemd-timesyncd"
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(
+ distro=distro, client=client
+ )
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ cc_ntp.handle("cc_ntp", cfg, mycloud, None, [])
+ self.assertEqual(
+ "[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n",
+ util.load_file(confpath),
+ )
+
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_handler_enabled_false(self, m_select):
+ """Test ntp handler does not run if enabled: false"""
+ cfg = {"ntp": {"enabled": False}}
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ cc_ntp.handle("notimportant", cfg, mycloud, None, None)
+ self.assertEqual(0, m_select.call_count)
+
+ @mock.patch("cloudinit.distros.subp")
+ @mock.patch("cloudinit.config.cc_ntp.subp")
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ @mock.patch("cloudinit.distros.Distro.uses_systemd")
+ def test_ntp_the_whole_package(self, m_sysd, m_select, m_subp, m_dsubp):
+ """Test enabled config renders template, and restarts service"""
+ cfg = {"ntp": {"enabled": True}}
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(distro=distro)
+ confpath = ntpconfig["confpath"]
+ service_name = ntpconfig["service_name"]
+ m_select.return_value = ntpconfig
+
+ hosts = cc_ntp.generate_server_names(mycloud.distro.name)
+ uses_systemd = True
+ expected_service_call = [
+ "systemctl",
+ "reload-or-restart",
+ service_name,
+ ]
+ expected_content = "servers []\npools {0}\n".format(hosts)
+
+ if distro == "alpine":
+ uses_systemd = False
+ expected_service_call = ["rc-service", service_name, "restart"]
+ # _mock_ntp_client_config call above did not specify a client
+ # value and so it defaults to "ntp" which on Alpine Linux only
+ # supports servers and not pools.
+ expected_content = "servers {0}\npools []\n".format(hosts)
+
+ m_sysd.return_value = uses_systemd
+ with mock.patch("cloudinit.config.cc_ntp.util") as m_util:
+ # allow use of util.mergemanydict
+ m_util.mergemanydict.side_effect = util.mergemanydict
+ # default client is present
+ m_subp.which.return_value = True
+ # use the config 'enabled' value
+ m_util.is_false.return_value = util.is_false(
+ cfg["ntp"]["enabled"]
+ )
+ cc_ntp.handle("notimportant", cfg, mycloud, None, None)
+ m_dsubp.subp.assert_called_with(
+ expected_service_call, capture=True
+ )
+
+ self.assertEqual(expected_content, util.load_file(confpath))
+
+ @mock.patch("cloudinit.util.system_info")
+ def test_opensuse_picks_chrony(self, m_sysinfo):
+ """Test opensuse picks chrony or ntp on certain distro versions"""
+ # < 15.0 => ntp
+ m_sysinfo.return_value = {"dist": ("openSUSE", "13.2", "Harlequin")}
+ mycloud = self._get_cloud("opensuse")
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ self.assertEqual("ntp", expected_client)
+
+ # >= 15.0 and not openSUSE => chrony
+ m_sysinfo.return_value = {
+ "dist": ("SLES", "15.0", "SUSE Linux Enterprise Server 15")
+ }
+ mycloud = self._get_cloud("sles")
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ self.assertEqual("chrony", expected_client)
+
+ # >= 15.0 and openSUSE and ver != 42 => chrony
+ m_sysinfo.return_value = {
+ "dist": ("openSUSE Tumbleweed", "20180326", "timbleweed")
+ }
+ mycloud = self._get_cloud("opensuse")
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ self.assertEqual("chrony", expected_client)
+
+ @mock.patch("cloudinit.util.system_info")
+ def test_ubuntu_xenial_picks_ntp(self, m_sysinfo):
+ """Test Ubuntu picks ntp on xenial release"""
+
+ m_sysinfo.return_value = {"dist": ("Ubuntu", "16.04", "xenial")}
+ mycloud = self._get_cloud("ubuntu")
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ self.assertEqual("ntp", expected_client)
+
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_snappy_system_picks_timesyncd(self, m_which):
+ """Test snappy systems prefer installed clients"""
+
+ # we are on ubuntu-core here
+ self.m_snappy.return_value = True
+
+ # ubuntu core systems will have timesyncd installed
+ m_which.side_effect = iter(
+ [None, "/lib/systemd/systemd-timesyncd", None, None, None]
+ )
+ distro = "ubuntu"
+ mycloud = self._get_cloud(distro)
+ distro_configs = cc_ntp.distro_ntp_client_configs(distro)
+ expected_client = "systemd-timesyncd"
+ expected_cfg = distro_configs[expected_client]
+ expected_calls = []
+ # we only get to timesyncd
+ for client in mycloud.distro.preferred_ntp_clients[0:2]:
+ cfg = distro_configs[client]
+ expected_calls.append(mock.call(cfg["check_exe"]))
+ result = cc_ntp.select_ntp_client(None, mycloud.distro)
+ m_which.assert_has_calls(expected_calls)
+ self.assertEqual(sorted(expected_cfg), sorted(cfg))
+ self.assertEqual(sorted(expected_cfg), sorted(result))
+
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_ntp_distro_searches_all_preferred_clients(self, m_which):
+ """Test select_ntp_client search all distro perferred clients"""
+ # nothing is installed
+ m_which.return_value = None
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ distro_configs = cc_ntp.distro_ntp_client_configs(distro)
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ expected_cfg = distro_configs[expected_client]
+ expected_calls = []
+ for client in mycloud.distro.preferred_ntp_clients:
+ cfg = distro_configs[client]
+ expected_calls.append(mock.call(cfg["check_exe"]))
+ cc_ntp.select_ntp_client({}, mycloud.distro)
+ m_which.assert_has_calls(expected_calls)
+ self.assertEqual(sorted(expected_cfg), sorted(cfg))
+
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which):
+ """Test user_cfg.ntp_client='auto' defaults to distro search"""
+ # nothing is installed
+ m_which.return_value = None
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ distro_configs = cc_ntp.distro_ntp_client_configs(distro)
+ expected_client = mycloud.distro.preferred_ntp_clients[0]
+ expected_cfg = distro_configs[expected_client]
+ expected_calls = []
+ for client in mycloud.distro.preferred_ntp_clients:
+ cfg = distro_configs[client]
+ expected_calls.append(mock.call(cfg["check_exe"]))
+ cc_ntp.select_ntp_client("auto", mycloud.distro)
+ m_which.assert_has_calls(expected_calls)
+ self.assertEqual(sorted(expected_cfg), sorted(cfg))
+
+ @mock.patch("cloudinit.config.cc_ntp.write_ntp_config_template")
+ @mock.patch("cloudinit.cloud.Cloud.get_template_filename")
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_ntp_custom_client_overrides_installed_clients(
+ self, m_which, m_tmpfn, m_write
+ ):
+ """Test user client is installed despite other clients present"""
+ client = "ntpdate"
+ cfg = {"ntp": {"ntp_client": client}}
+ for distro in cc_ntp.distros:
+ # client is not installed
+ m_which.side_effect = iter([None])
+ mycloud = self._get_cloud(distro)
+ with mock.patch.object(
+ mycloud.distro, "install_packages"
+ ) as m_install:
+ cc_ntp.handle("notimportant", cfg, mycloud, None, None)
+ m_install.assert_called_with([client])
+ m_which.assert_called_with(client)
+
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_ntp_system_config_overrides_distro_builtin_clients(self, m_which):
+ """Test distro system_config overrides builtin preferred ntp clients"""
+ system_client = "chrony"
+ sys_cfg = {"ntp_client": system_client}
+ # no clients installed
+ m_which.return_value = None
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro, sys_cfg=sys_cfg)
+ distro_configs = cc_ntp.distro_ntp_client_configs(distro)
+ expected_cfg = distro_configs[system_client]
+ result = cc_ntp.select_ntp_client(None, mycloud.distro)
+ self.assertEqual(sorted(expected_cfg), sorted(result))
+ m_which.assert_has_calls([])
+
+ @mock.patch("cloudinit.config.cc_ntp.subp.which")
+ def test_ntp_user_config_overrides_system_cfg(self, m_which):
+ """Test user-data overrides system_config ntp_client"""
+ system_client = "chrony"
+ sys_cfg = {"ntp_client": system_client}
+ user_client = "systemd-timesyncd"
+ # no clients installed
+ m_which.return_value = None
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro, sys_cfg=sys_cfg)
+ distro_configs = cc_ntp.distro_ntp_client_configs(distro)
+ expected_cfg = distro_configs[user_client]
+ result = cc_ntp.select_ntp_client(user_client, mycloud.distro)
+ self.assertEqual(sorted(expected_cfg), sorted(result))
+ m_which.assert_has_calls([])
+
+ @mock.patch("cloudinit.config.cc_ntp.install_ntp_client")
+ def test_ntp_user_provided_config_with_template(self, m_install):
+ custom = r"\n#MyCustomTemplate"
+ user_template = NTP_TEMPLATE + custom
+ confpath = os.path.join(self.new_root, "etc/myntp/myntp.conf")
+ cfg = {
+ "ntp": {
+ "pools": ["mypool.org"],
+ "ntp_client": "myntpd",
+ "config": {
+ "check_exe": "myntpd",
+ "confpath": confpath,
+ "packages": ["myntp"],
+ "service_name": "myntp",
+ "template": user_template,
+ },
+ }
+ }
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR"
+ with mock.patch(mock_path, self.new_root):
+ cc_ntp.handle("notimportant", cfg, mycloud, None, None)
+ self.assertEqual(
+ "servers []\npools ['mypool.org']\n%s" % custom,
+ util.load_file(confpath),
+ )
+
+ @mock.patch("cloudinit.config.cc_ntp.supplemental_schema_validation")
+ @mock.patch("cloudinit.config.cc_ntp.install_ntp_client")
+ @mock.patch("cloudinit.config.cc_ntp.select_ntp_client")
+ def test_ntp_user_provided_config_template_only(
+ self, m_select, m_install, m_schema
+ ):
+ """Test custom template for default client"""
+ custom = r"\n#MyCustomTemplate"
+ user_template = NTP_TEMPLATE + custom
+ client = "chrony"
+ cfg = {
+ "pools": ["mypool.org"],
+ "ntp_client": client,
+ "config": {
+ "template": user_template,
+ },
+ }
+ expected_merged_cfg = {
+ "check_exe": "chronyd",
+ "confpath": "{tmpdir}/client.conf".format(tmpdir=self.new_root),
+ "template_name": "client.conf",
+ "template": user_template,
+ "service_name": "chrony",
+ "packages": ["chrony"],
+ }
+ for distro in cc_ntp.distros:
+ mycloud = self._get_cloud(distro)
+ ntpconfig = self._mock_ntp_client_config(
+ client=client, distro=distro
+ )
+ confpath = ntpconfig["confpath"]
+ m_select.return_value = ntpconfig
+ mock_path = "cloudinit.config.cc_ntp.temp_utils._TMPDIR"
+ with mock.patch(mock_path, self.new_root):
+ cc_ntp.handle(
+ "notimportant", {"ntp": cfg}, mycloud, None, None
+ )
+ self.assertEqual(
+ "servers []\npools ['mypool.org']\n%s" % custom,
+ util.load_file(confpath),
+ )
+ m_schema.assert_called_with(expected_merged_cfg)
+
+
+class TestSupplementalSchemaValidation(CiTestCase):
+ def test_error_on_missing_keys(self):
+ """ValueError raised reporting any missing required ntp:config keys"""
+ cfg = {}
+ match = (
+ r"Invalid ntp configuration:\\nMissing required ntp:config"
+ " keys: check_exe, confpath, packages, service_name"
+ )
+ with self.assertRaisesRegex(ValueError, match):
+ cc_ntp.supplemental_schema_validation(cfg)
+
+ def test_error_requiring_either_template_or_template_name(self):
+ """ValueError raised if both template not template_name are None."""
+ cfg = {
+ "confpath": "someconf",
+ "check_exe": "",
+ "service_name": "",
+ "template": None,
+ "template_name": None,
+ "packages": [],
+ }
+ match = (
+ r"Invalid ntp configuration:\\nEither ntp:config:template"
+ " or ntp:config:template_name values are required"
+ )
+ with self.assertRaisesRegex(ValueError, match):
+ cc_ntp.supplemental_schema_validation(cfg)
+
+ def test_error_on_non_list_values(self):
+ """ValueError raised when packages is not of type list."""
+ cfg = {
+ "confpath": "someconf",
+ "check_exe": "",
+ "service_name": "",
+ "template": "asdf",
+ "template_name": None,
+ "packages": "NOPE",
+ }
+ match = (
+ r"Invalid ntp configuration:\\nExpected a list of required"
+ " package names for ntp:config:packages. Found \\(NOPE\\)"
+ )
+ with self.assertRaisesRegex(ValueError, match):
+ cc_ntp.supplemental_schema_validation(cfg)
+
+ def test_error_on_non_string_values(self):
+ """ValueError raised for any values expected as string type."""
+ cfg = {
+ "confpath": 1,
+ "check_exe": 2,
+ "service_name": 3,
+ "template": 4,
+ "template_name": 5,
+ "packages": [],
+ }
+ errors = [
+ "Expected a config file path ntp:config:confpath. Found (1)",
+ "Expected a string type for ntp:config:check_exe. Found (2)",
+ "Expected a string type for ntp:config:service_name. Found (3)",
+ "Expected a string type for ntp:config:template. Found (4)",
+ "Expected a string type for ntp:config:template_name. Found (5)",
+ ]
+ with self.assertRaises(ValueError) as context_mgr:
+ cc_ntp.supplemental_schema_validation(cfg)
+ error_msg = str(context_mgr.exception)
+ for error in errors:
+ self.assertIn(error, error_msg)
+
+
+# vi: ts=4 expandtab