summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules/test_ntp_servers.py
blob: c777a641d24a777fc75d529773f8e2f57f5f219e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Integration test for the ntp module's ntp functionality.

This test specifies the use of the `ntp` NTP client, and ensures that the given
NTP servers are configured as expected.

(This is ported from ``tests/cloud_tests/testcases/modules/ntp_servers.yaml``,
``tests/cloud_tests/testcases/modules/ntp_pools.yaml``,
and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``)
"""
import re

import yaml
import pytest

from tests.integration_tests.instances import IntegrationInstance

USER_DATA = """\
#cloud-config
ntp:
  ntp_client: ntp
  servers:
      - 172.16.15.14
      - 172.16.17.18
  pools:
      - 0.cloud-init.mypool
      - 1.cloud-init.mypool
      - 172.16.15.15
"""

EXPECTED_SERVERS = yaml.safe_load(USER_DATA)["ntp"]["servers"]
EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"]


@pytest.mark.user_data(USER_DATA)
class TestNtpServers:

    def test_ntp_installed(self, class_client: IntegrationInstance):
        """Test that `ntpd --version` succeeds, indicating installation."""
        assert class_client.execute("ntpd --version").ok

    def test_dist_config_file_is_empty(self,
                                       class_client: IntegrationInstance):
        """Test that the distributed config file is empty.

        (This test is skipped on all currently supported Ubuntu releases, so
        may not actually be needed any longer.)
        """
        if class_client.execute("test -e /etc/ntp.conf.dist").failed:
            pytest.skip("/etc/ntp.conf.dist does not exist")
        dist_file = class_client.read_from_file("/etc/ntp.conf.dist")
        assert 0 == len(dist_file.strip().splitlines())

    def test_ntp_entries(self, class_client: IntegrationInstance):
        ntp_conf = class_client.read_from_file("/etc/ntp.conf")
        for expected_server in EXPECTED_SERVERS:
            assert re.search(
                r"^server {} iburst".format(expected_server),
                ntp_conf,
                re.MULTILINE
            )
        for expected_pool in EXPECTED_POOLS:
            assert re.search(
                r"^pool {} iburst".format(expected_pool),
                ntp_conf,
                re.MULTILINE
            )

    def test_ntpq_servers(self, class_client: IntegrationInstance):
        result = class_client.execute("ntpq -p -w -n")
        assert result.ok
        for expected_server_or_pool in [*EXPECTED_SERVERS, *EXPECTED_POOLS]:
            assert expected_server_or_pool in result.stdout


CHRONY_DATA = """\
#cloud-config
ntp:
  enabled: true
  ntp_client: chrony
  servers:
      - 172.16.15.14
"""


@pytest.mark.user_data(CHRONY_DATA)
def test_chrony(client: IntegrationInstance):
    if client.execute('test -f /etc/chrony.conf').ok:
        chrony_conf = '/etc/chrony.conf'
    else:
        chrony_conf = '/etc/chrony/chrony.conf'
    contents = client.read_from_file(chrony_conf)
    assert 'server 172.16.15.14' in contents


TIMESYNCD_DATA = """\
#cloud-config
ntp:
  enabled: true
  ntp_client: systemd-timesyncd
  servers:
      - 172.16.15.14
"""


@pytest.mark.user_data(TIMESYNCD_DATA)
def test_timesyncd(client: IntegrationInstance):
    contents = client.read_from_file(
        '/etc/systemd/timesyncd.conf.d/cloud-init.conf'
    )
    assert 'NTP=172.16.15.14' in contents


EMPTY_NTP = """\
#cloud-config
ntp:
  ntp_client: ntp
  pools: []
  servers: []
"""


@pytest.mark.user_data(EMPTY_NTP)
def test_empty_ntp(client: IntegrationInstance):
    assert client.execute('ntpd --version').ok
    assert client.execute('test -f /etc/ntp.conf.dist').failed
    assert 'pool.ntp.org iburst' in client.execute(
        'grep -v "^#" /etc/ntp.conf'
    )