summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorScott Moser <smoser@brickies.net>2017-05-26 15:53:48 -0400
committerScott Moser <smoser@brickies.net>2017-05-26 15:53:48 -0400
commitcc9748215f612b8c600c1080c60af71fe7624c47 (patch)
tree3f008cb6350adf6bc003d2d5ad2d851c9506b81e /tests
parent3dd56b4504003928bace87a7e67b08e9376fc6c1 (diff)
parent16a7302f6acb69adb0aee75eaf12392fa3688853 (diff)
downloadvyos-cloud-init-cc9748215f612b8c600c1080c60af71fe7624c47.tar.gz
vyos-cloud-init-cc9748215f612b8c600c1080c60af71fe7624c47.zip
merge from master at 0.7.9-153-g16a7302f
Diffstat (limited to 'tests')
-rw-r--r--tests/cloud_tests/configs/modules/ntp_pools.yaml10
-rw-r--r--tests/cloud_tests/configs/modules/ntp_servers.yaml16
-rw-r--r--tests/cloud_tests/configs/modules/snappy.yaml4
-rw-r--r--tests/cloud_tests/releases.yaml7
-rw-r--r--tests/cloud_tests/testcases/modules/ntp.py4
-rw-r--r--tests/cloud_tests/testcases/modules/ntp_pools.py18
-rw-r--r--tests/cloud_tests/testcases/modules/ntp_servers.py19
-rw-r--r--tests/cloud_tests/testcases/modules/snappy.py7
-rw-r--r--tests/unittests/helpers.py40
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py5
-rw-r--r--tests/unittests/test_datasource/test_azure.py320
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py4
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py10
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py25
-rw-r--r--tests/unittests/test_datasource/test_gce.py4
-rw-r--r--tests/unittests/test_datasource/test_maas.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py4
-rw-r--r--tests/unittests/test_datasource/test_openstack.py4
-rw-r--r--tests/unittests/test_datasource/test_ovf.py2
-rw-r--r--tests/unittests/test_distros/test_netconfig.py57
-rw-r--r--tests/unittests/test_distros/test_resolv.py2
-rw-r--r--tests/unittests/test_ds_identify.py300
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py94
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py77
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py367
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py4
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py59
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py4
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py4
-rw-r--r--tests/unittests/test_helpers.py2
-rw-r--r--tests/unittests/test_net.py222
-rw-r--r--tests/unittests/test_util.py82
32 files changed, 1365 insertions, 414 deletions
diff --git a/tests/cloud_tests/configs/modules/ntp_pools.yaml b/tests/cloud_tests/configs/modules/ntp_pools.yaml
index bd0ac292..e040cc32 100644
--- a/tests/cloud_tests/configs/modules/ntp_pools.yaml
+++ b/tests/cloud_tests/configs/modules/ntp_pools.yaml
@@ -5,10 +5,9 @@ cloud_config: |
#cloud-config
ntp:
pools:
- - 0.pool.ntp.org
- - 1.pool.ntp.org
- - 2.pool.ntp.org
- - 3.pool.ntp.org
+ - 0.cloud-init.mypool
+ - 1.cloud-init.mypool
+ - 172.16.15.14
collect_scripts:
ntp_installed_pools: |
#!/bin/bash
@@ -19,5 +18,8 @@ collect_scripts:
ntp_conf_pools: |
#!/bin/bash
grep '^pool' /etc/ntp.conf
+ ntpq_servers: |
+ #!/bin/sh
+ ntpq -p -w
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/configs/modules/ntp_servers.yaml b/tests/cloud_tests/configs/modules/ntp_servers.yaml
index 934b9c5d..e0564a03 100644
--- a/tests/cloud_tests/configs/modules/ntp_servers.yaml
+++ b/tests/cloud_tests/configs/modules/ntp_servers.yaml
@@ -5,16 +5,20 @@ cloud_config: |
#cloud-config
ntp:
servers:
- - pool.ntp.org
+ - 172.16.15.14
+ - 172.16.17.18
collect_scripts:
ntp_installed_servers: |
- #!/bin/bash
- dpkg -l | grep ntp | wc -l
+ #!/bin/sh
+ dpkg -l | grep -c ntp
ntp_conf_dist_servers: |
- #!/bin/bash
- ls /etc/ntp.conf.dist | wc -l
+ #!/bin/sh
+ cat /etc/ntp.conf.dist | wc -l
ntp_conf_servers: |
- #!/bin/bash
+ #!/bin/sh
grep '^server' /etc/ntp.conf
+ ntpq_servers: |
+ #!/bin/sh
+ ntpq -p -w
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/configs/modules/snappy.yaml b/tests/cloud_tests/configs/modules/snappy.yaml
index 923bfe12..0e7dc852 100644
--- a/tests/cloud_tests/configs/modules/snappy.yaml
+++ b/tests/cloud_tests/configs/modules/snappy.yaml
@@ -6,8 +6,8 @@ cloud_config: |
snappy:
system_snappy: auto
collect_scripts:
- snap_version: |
+ snapd: |
#!/bin/bash
- snap --version
+ dpkg -s snapd
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/releases.yaml b/tests/cloud_tests/releases.yaml
index 3ffa68f0..183f78c1 100644
--- a/tests/cloud_tests/releases.yaml
+++ b/tests/cloud_tests/releases.yaml
@@ -49,6 +49,13 @@ releases:
#alias: ubuntu/zesty/default
alias: z
sstreams_server: https://cloud-images.ubuntu.com/daily
+ artful:
+ enabled: true
+ platform_ident:
+ lxd:
+ #alias: ubuntu/artful/default
+ alias: a
+ sstreams_server: https://cloud-images.ubuntu.com/daily
jessie:
platform_ident:
lxd:
diff --git a/tests/cloud_tests/testcases/modules/ntp.py b/tests/cloud_tests/testcases/modules/ntp.py
index b1119257..82d32880 100644
--- a/tests/cloud_tests/testcases/modules/ntp.py
+++ b/tests/cloud_tests/testcases/modules/ntp.py
@@ -13,9 +13,9 @@ class TestNtp(base.CloudTestCase):
self.assertEqual(1, int(out))
def test_ntp_dist_entries(self):
- """Test dist config file has one entry"""
+ """Test dist config file is empty"""
out = self.get_data_file('ntp_conf_dist_empty')
- self.assertEqual(1, int(out))
+ self.assertEqual(0, int(out))
def test_ntp_entires(self):
"""Test config entries"""
diff --git a/tests/cloud_tests/testcases/modules/ntp_pools.py b/tests/cloud_tests/testcases/modules/ntp_pools.py
index d80cb673..ff6d8fa4 100644
--- a/tests/cloud_tests/testcases/modules/ntp_pools.py
+++ b/tests/cloud_tests/testcases/modules/ntp_pools.py
@@ -13,16 +13,22 @@ class TestNtpPools(base.CloudTestCase):
self.assertEqual(1, int(out))
def test_ntp_dist_entries(self):
- """Test dist config file has one entry"""
+ """Test dist config file is empty"""
out = self.get_data_file('ntp_conf_dist_pools')
- self.assertEqual(1, int(out))
+ self.assertEqual(0, int(out))
def test_ntp_entires(self):
"""Test config entries"""
out = self.get_data_file('ntp_conf_pools')
- self.assertIn('pool 0.pool.ntp.org iburst', out)
- self.assertIn('pool 1.pool.ntp.org iburst', out)
- self.assertIn('pool 2.pool.ntp.org iburst', out)
- self.assertIn('pool 3.pool.ntp.org iburst', out)
+ pools = self.cloud_config.get('ntp').get('pools')
+ for pool in pools:
+ self.assertIn('pool %s iburst' % pool, out)
+
+ def test_ntpq_servers(self):
+ """Test ntpq output has configured servers"""
+ out = self.get_data_file('ntpq_servers')
+ pools = self.cloud_config.get('ntp').get('pools')
+ for pool in pools:
+ self.assertIn(pool, out)
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/modules/ntp_servers.py b/tests/cloud_tests/testcases/modules/ntp_servers.py
index 4879bb6f..4010cf80 100644
--- a/tests/cloud_tests/testcases/modules/ntp_servers.py
+++ b/tests/cloud_tests/testcases/modules/ntp_servers.py
@@ -13,13 +13,22 @@ class TestNtpServers(base.CloudTestCase):
self.assertEqual(1, int(out))
def test_ntp_dist_entries(self):
- """Test dist config file has one entry"""
+ """Test dist config file is empty"""
out = self.get_data_file('ntp_conf_dist_servers')
- self.assertEqual(1, int(out))
+ self.assertEqual(0, int(out))
- def test_ntp_entires(self):
- """Test config entries"""
+ def test_ntp_entries(self):
+ """Test config server entries"""
out = self.get_data_file('ntp_conf_servers')
- self.assertIn('server pool.ntp.org iburst', out)
+ servers = self.cloud_config.get('ntp').get('servers')
+ for server in servers:
+ self.assertIn('server %s iburst' % server, out)
+
+ def test_ntpq_servers(self):
+ """Test ntpq output has configured servers"""
+ out = self.get_data_file('ntpq_servers')
+ servers = self.cloud_config.get('ntp').get('servers')
+ for server in servers:
+ self.assertIn(server, out)
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/modules/snappy.py b/tests/cloud_tests/testcases/modules/snappy.py
index 3e2f5924..b92271c1 100644
--- a/tests/cloud_tests/testcases/modules/snappy.py
+++ b/tests/cloud_tests/testcases/modules/snappy.py
@@ -9,10 +9,7 @@ class TestSnappy(base.CloudTestCase):
def test_snappy_version(self):
"""Test snappy version output"""
- out = self.get_data_file('snap_version')
- self.assertIn('snap ', out)
- self.assertIn('snapd ', out)
- self.assertIn('series ', out)
- self.assertIn('ubuntu ', out)
+ out = self.get_data_file('snapd')
+ self.assertIn('Status: install ok installed', out)
# vi: ts=4 expandtab
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 90e2431f..9ff15993 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -3,6 +3,8 @@
from __future__ import print_function
import functools
+import json
+import logging
import os
import shutil
import sys
@@ -17,6 +19,10 @@ try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from io import StringIO
from cloudinit import helpers as ch
from cloudinit import util
@@ -86,6 +92,27 @@ class TestCase(unittest2.TestCase):
class CiTestCase(TestCase):
"""This is the preferred test case base class unless user
needs other test case classes below."""
+
+ # Subclass overrides for specific test behavior
+ # Whether or not a unit test needs logfile setup
+ with_logs = False
+
+ def setUp(self):
+ super(CiTestCase, self).setUp()
+ if self.with_logs:
+ # Create a log handler so unit tests can search expected logs.
+ logger = logging.getLogger()
+ self.logs = StringIO()
+ handler = logging.StreamHandler(self.logs)
+ self.old_handlers = logger.handlers
+ logger.handlers = [handler]
+
+ def tearDown(self):
+ if self.with_logs:
+ # Remove the handler we setup
+ logging.getLogger().handlers = self.old_handlers
+ super(CiTestCase, self).tearDown()
+
def tmp_dir(self, dir=None, cleanup=True):
# return a full path to a temporary directory that will be cleaned up.
if dir is None:
@@ -105,7 +132,7 @@ class CiTestCase(TestCase):
return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
-class ResourceUsingTestCase(TestCase):
+class ResourceUsingTestCase(CiTestCase):
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
self.resource_path = None
@@ -228,8 +255,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def reRoot(self, root=None):
if root is None:
- root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, root)
+ root = self.tmp_dir()
self.patchUtils(root)
self.patchOS(root)
return root
@@ -255,7 +281,7 @@ def populate_dir(path, files):
os.makedirs(path)
ret = []
for (name, content) in files.items():
- p = os.path.join(path, name)
+ p = os.path.sep.join([path, name])
util.ensure_dir(os.path.dirname(p))
with open(p, "wb") as fp:
if isinstance(content, six.binary_type):
@@ -280,6 +306,12 @@ def dir2dict(startdir, prefix=None):
return flist
+def json_dumps(data):
+ # print data in nicely formatted json.
+ return json.dumps(data, indent=1, sort_keys=True,
+ separators=(',', ': '))
+
+
def wrap_and_call(prefix, mocks, func, *args, **kwargs):
"""
call func(args, **kwargs) with mocks applied, then unapplies mocks
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 63a2b04d..9c46abc1 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -17,7 +17,8 @@ import tempfile
from cloudinit import helpers
from cloudinit import util
-from unittest import TestCase
+
+from ..helpers import TestCase
import cloudinit.sources.DataSourceAltCloud as dsac
@@ -419,7 +420,7 @@ class TestReadUserDataCallback(TestCase):
'''Test read_user_data_callback() no files are found.'''
_remove_user_data_files(self.mount_dir)
- self.assertEqual(None, dsac.read_user_data_callback(self.mount_dir))
+ self.assertIsNone(dsac.read_user_data_callback(self.mount_dir))
def force_arch(arch=None):
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 8d22bb59..852ec703 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,10 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit import helpers
-from cloudinit.util import b64e, decode_binary, load_file
-from cloudinit.sources import DataSourceAzure
+from cloudinit.util import b64e, decode_binary, load_file, write_file
+from cloudinit.sources import DataSourceAzure as dsaz
+from cloudinit.util import find_freebsd_part
+from cloudinit.util import get_path_dev_freebsd
-from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
+from ..helpers import (CiTestCase, TestCase, populate_dir, mock,
+ ExitStack, PY26, SkipTest)
import crypt
import os
@@ -95,6 +98,40 @@ class TestAzureDataSource(TestCase):
for module, name, new in patches:
self.patches.enter_context(mock.patch.object(module, name, new))
+ def _get_mockds(self):
+ sysctl_out = "dev.storvsc.3.%pnpinfo: "\
+ "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\
+ "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n"
+ sysctl_out += "dev.storvsc.2.%pnpinfo: "\
+ "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\
+ "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n"
+ sysctl_out += "dev.storvsc.1.%pnpinfo: "\
+ "classid=32412632-86cb-44a2-9b5c-50d1417354f5 "\
+ "deviceid=00000000-0001-8899-0000-000000000000\n"
+ camctl_devbus = """
+scbus0 on ata0 bus 0
+scbus1 on ata1 bus 0
+scbus2 on blkvsc0 bus 0
+scbus3 on blkvsc1 bus 0
+scbus4 on storvsc2 bus 0
+scbus5 on storvsc3 bus 0
+scbus-1 on xpt0 bus 0
+ """
+ camctl_dev = """
+<Msft Virtual CD/ROM 1.0> at scbus1 target 0 lun 0 (cd0,pass0)
+<Msft Virtual Disk 1.0> at scbus2 target 0 lun 0 (da0,pass1)
+<Msft Virtual Disk 1.0> at scbus3 target 1 lun 0 (da1,pass2)
+ """
+ self.apply_patches([
+ (dsaz, 'get_dev_storvsc_sysctl', mock.MagicMock(
+ return_value=sysctl_out)),
+ (dsaz, 'get_camcontrol_dev_bus', mock.MagicMock(
+ return_value=camctl_devbus)),
+ (dsaz, 'get_camcontrol_dev', mock.MagicMock(
+ return_value=camctl_dev))
+ ])
+ return dsaz
+
def _get_ds(self, data, agent_command=None):
def dsdevs():
@@ -115,8 +152,7 @@ class TestAzureDataSource(TestCase):
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
- mod = DataSourceAzure
- mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.get_metadata_from_fabric = mock.MagicMock(return_value={
'public-keys': [],
@@ -125,19 +161,19 @@ class TestAzureDataSource(TestCase):
self.instance_id = 'test-instance-id'
self.apply_patches([
- (mod, 'list_possible_azure_ds_devs', dsdevs),
- (mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
- (mod, 'perform_hostname_bounce', mock.MagicMock()),
- (mod, 'get_hostname', mock.MagicMock()),
- (mod, 'set_hostname', mock.MagicMock()),
- (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
- (mod.util, 'read_dmi_data', mock.MagicMock(
+ (dsaz, 'list_possible_azure_ds_devs', dsdevs),
+ (dsaz, 'invoke_agent', _invoke_agent),
+ (dsaz, 'wait_for_files', _wait_for_files),
+ (dsaz, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
+ (dsaz, 'perform_hostname_bounce', mock.MagicMock()),
+ (dsaz, 'get_hostname', mock.MagicMock()),
+ (dsaz, 'set_hostname', mock.MagicMock()),
+ (dsaz, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (dsaz.util, 'read_dmi_data', mock.MagicMock(
return_value=self.instance_id)),
])
- dsrc = mod.DataSourceAzureNet(
+ dsrc = dsaz.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
if agent_command is not None:
dsrc.ds_cfg['agent_command'] = agent_command
@@ -177,6 +213,34 @@ class TestAzureDataSource(TestCase):
return
raise AssertionError("XML is the same")
+ def test_get_resource_disk(self):
+ ds = self._get_mockds()
+ dev = ds.get_resource_disk_on_freebsd(1)
+ self.assertEqual("da1", dev)
+
+ @mock.patch('cloudinit.util.subp')
+ def test_find_freebsd_part_on_Azure(self, mock_subp):
+ glabel_out = '''
+gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1
+ label/rootfs N/A da0p2
+ label/swap N/A da0p3
+'''
+ mock_subp.return_value = (glabel_out, "")
+ res = find_freebsd_part("/dev/label/rootfs")
+ self.assertEqual("da0p2", res)
+
+ def test_get_path_dev_freebsd_on_Azure(self):
+ mnt_list = '''
+/dev/label/rootfs / ufs rw 1 1
+devfs /dev devfs rw,multilabel 0 0
+fdescfs /dev/fd fdescfs rw 0 0
+/dev/da1s1 /mnt/resource ufs rw 2 2
+'''
+ with mock.patch.object(os.path, 'exists',
+ return_value=True):
+ res = get_path_dev_freebsd('/etc', mnt_list)
+ self.assertIsNotNone(res)
+
def test_basic_seed_dir(self):
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
@@ -353,7 +417,7 @@ class TestAzureDataSource(TestCase):
cfg = dsrc.get_config_obj()
self.assertEqual(dsrc.device_name_to_device("ephemeral0"),
- DataSourceAzure.RESOURCE_DISK_PATH)
+ dsaz.RESOURCE_DISK_PATH)
assert 'disk_setup' in cfg
assert 'fs_setup' in cfg
self.assertIsInstance(cfg['disk_setup'], dict)
@@ -403,14 +467,13 @@ class TestAzureDataSource(TestCase):
# Make sure that the redacted password on disk is not used by CI
self.assertNotEqual(dsrc.cfg.get('password'),
- DataSourceAzure.DEF_PASSWD_REDACTION)
+ dsaz.DEF_PASSWD_REDACTION)
# Make sure that the password was really encrypted
et = ET.fromstring(on_disk_ovf)
for elem in et.iter():
if 'UserPassword' in elem.tag:
- self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION,
- elem.text)
+ self.assertEqual(dsaz.DEF_PASSWD_REDACTION, elem.text)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
@@ -459,17 +522,17 @@ class TestAzureBounce(TestCase):
def mock_out_azure_moving_parts(self):
self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'invoke_agent'))
+ mock.patch.object(dsaz, 'invoke_agent'))
self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'wait_for_files'))
+ mock.patch.object(dsaz, 'wait_for_files'))
self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
+ mock.patch.object(dsaz, 'list_possible_azure_ds_devs',
mock.MagicMock(return_value=[])))
self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric',
+ mock.patch.object(dsaz, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
self.patches.enter_context(
- mock.patch.object(DataSourceAzure.util, 'read_dmi_data',
+ mock.patch.object(dsaz.util, 'read_dmi_data',
mock.MagicMock(return_value='test-instance-id')))
def setUp(self):
@@ -478,13 +541,13 @@ class TestAzureBounce(TestCase):
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.addCleanup(shutil.rmtree, self.tmp)
- DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.patches = ExitStack()
self.mock_out_azure_moving_parts()
self.get_hostname = self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'get_hostname'))
+ mock.patch.object(dsaz, 'get_hostname'))
self.set_hostname = self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'set_hostname'))
+ mock.patch.object(dsaz, 'set_hostname'))
self.subp = self.patches.enter_context(
mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
@@ -495,7 +558,7 @@ class TestAzureBounce(TestCase):
if ovfcontent is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': ovfcontent})
- dsrc = DataSourceAzure.DataSourceAzureNet(
+ dsrc = dsaz.DataSourceAzureNet(
{}, distro=None, paths=self.paths)
if agent_command is not None:
dsrc.ds_cfg['agent_command'] = agent_command
@@ -608,7 +671,7 @@ class TestAzureBounce(TestCase):
def test_default_bounce_command_used_by_default(self):
cmd = 'default-bounce-command'
- DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ dsaz.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
cfg = {'hostname_bounce': {'policy': 'force'}}
data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
self._get_ds(data, agent_command=['not', '__builtin__']).get_data()
@@ -636,15 +699,208 @@ class TestAzureBounce(TestCase):
class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
- self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
- DataSourceAzure.read_azure_ovf, invalid_xml)
+ self.assertRaises(dsaz.BrokenAzureDataSource,
+ dsaz.read_azure_ovf, invalid_xml)
def test_load_with_pubkeys(self):
mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
content = construct_valid_ovf_env(pubkeys=pubkeys)
- (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
+ (_md, _ud, cfg) = dsaz.read_azure_ovf(content)
for mypk in mypklist:
self.assertIn(mypk, cfg['_pubkeys'])
+
+class TestCanDevBeReformatted(CiTestCase):
+ warning_file = 'dataloss_warning_readme.txt'
+
+ def _domock(self, mockpath, sattr=None):
+ patcher = mock.patch(mockpath)
+ setattr(self, sattr, patcher.start())
+ self.addCleanup(patcher.stop)
+
+ def setUp(self):
+ super(TestCanDevBeReformatted, self).setUp()
+
+ def patchup(self, devs):
+ bypath = {}
+ for path, data in devs.items():
+ bypath[path] = data
+ if 'realpath' in data:
+ bypath[data['realpath']] = data
+ for ppath, pdata in data.get('partitions', {}).items():
+ bypath[ppath] = pdata
+ if 'realpath' in data:
+ bypath[pdata['realpath']] = pdata
+
+ def realpath(d):
+ return bypath[d].get('realpath', d)
+
+ def partitions_on_device(devpath):
+ parts = bypath.get(devpath, {}).get('partitions', {})
+ ret = []
+ for path, data in parts.items():
+ ret.append((data.get('num'), realpath(path)))
+ # return sorted by partition number
+ return sorted(ret, key=lambda d: d[0])
+
+ def mount_cb(device, callback):
+ p = self.tmp_dir()
+ for f in bypath.get(device).get('files', []):
+ write_file(os.path.join(p, f), content=f)
+ return callback(p)
+
+ def has_ntfs_fs(device):
+ return bypath.get(device, {}).get('fs') == 'ntfs'
+
+ p = 'cloudinit.sources.DataSourceAzure'
+ self._domock(p + "._partitions_on_device", 'm_partitions_on_device')
+ self._domock(p + "._has_ntfs_filesystem", 'm_has_ntfs_filesystem')
+ self._domock(p + ".util.mount_cb", 'm_mount_cb')
+ self._domock(p + ".os.path.realpath", 'm_realpath')
+ self._domock(p + ".os.path.exists", 'm_exists')
+
+ self.m_exists.side_effect = lambda p: p in bypath
+ self.m_realpath.side_effect = realpath
+ self.m_has_ntfs_filesystem.side_effect = has_ntfs_fs
+ self.m_mount_cb.side_effect = mount_cb
+ self.m_partitions_on_device.side_effect = partitions_on_device
+
+ def test_three_partitions_is_false(self):
+ """A disk with 3 partitions can not be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1},
+ '/dev/sda2': {'num': 2},
+ '/dev/sda3': {'num': 3},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertFalse(False, value)
+ self.assertIn("3 or more", msg.lower())
+
+ def test_no_partitions_is_false(self):
+ """A disk with no partitions can not be formatted."""
+ self.patchup({'/dev/sda': {}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(False, value)
+ self.assertIn("not partitioned", msg.lower())
+
+ def test_two_partitions_not_ntfs_false(self):
+ """2 partitions and 2nd not ntfs can not be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1},
+ '/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertFalse(False, value)
+ self.assertIn("not ntfs", msg.lower())
+
+ def test_two_partitions_ntfs_populated_false(self):
+ """2 partitions and populated ntfs fs on 2nd can not be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1},
+ '/dev/sda2': {'num': 2, 'fs': 'ntfs',
+ 'files': ['secret.txt']},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertFalse(False, value)
+ self.assertIn("files on it", msg.lower())
+
+ def test_two_partitions_ntfs_empty_is_true(self):
+ """2 partitions and empty ntfs fs on 2nd can be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1},
+ '/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(True, value)
+ self.assertIn("safe for", msg.lower())
+
+ def test_one_partition_not_ntfs_false(self):
+ """1 partition witih fs other than ntfs can not be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1, 'fs': 'zfs'},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(False, value)
+ self.assertIn("not ntfs", msg.lower())
+
+ def test_one_partition_ntfs_populated_false(self):
+ """1 mountable ntfs partition with many files can not be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1, 'fs': 'ntfs',
+ 'files': ['file1.txt', 'file2.exe']},
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(False, value)
+ self.assertIn("files on it", msg.lower())
+
+ def test_one_partition_ntfs_empty_is_true(self):
+ """1 mountable ntfs partition and no files can be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(True, value)
+ self.assertIn("safe for", msg.lower())
+
+ def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self):
+ """1 mountable ntfs partition and only warn file can be formatted."""
+ self.patchup({
+ '/dev/sda': {
+ 'partitions': {
+ '/dev/sda1': {'num': 1, 'fs': 'ntfs',
+ 'files': ['dataloss_warning_readme.txt']}
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
+ self.assertEqual(True, value)
+ self.assertIn("safe for", msg.lower())
+
+ def test_one_partition_through_realpath_is_true(self):
+ """A symlink to a device with 1 ntfs partition can be formatted."""
+ epath = '/dev/disk/cloud/azure_resource'
+ self.patchup({
+ epath: {
+ 'realpath': '/dev/sdb',
+ 'partitions': {
+ epath + '-part1': {
+ 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file],
+ 'realpath': '/dev/sdb1'}
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted(epath)
+ self.assertEqual(True, value)
+ self.assertIn("safe for", msg.lower())
+
+ def test_three_partition_through_realpath_is_false(self):
+ """A symlink to a device with 3 partitions can not be formatted."""
+ epath = '/dev/disk/cloud/azure_resource'
+ self.patchup({
+ epath: {
+ 'realpath': '/dev/sdb',
+ 'partitions': {
+ epath + '-part1': {
+ 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file],
+ 'realpath': '/dev/sdb1'},
+ epath + '-part2': {'num': 2, 'fs': 'ext3',
+ 'realpath': '/dev/sdb2'},
+ epath + '-part3': {'num': 3, 'fs': 'ext',
+ 'realpath': '/dev/sdb3'}
+ }}})
+ value, msg = dsaz.can_dev_be_reformatted(epath)
+ self.assertEqual(False, value)
+ self.assertIn("3 or more", msg.lower())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index aafdebd7..b2d2971b 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -3,7 +3,6 @@
import os
from cloudinit.sources.helpers import azure as azure_helper
-
from ..helpers import ExitStack, mock, TestCase
@@ -72,10 +71,11 @@ class TestFindEndpoint(TestCase):
@staticmethod
def _build_lease_content(encoded_address):
+ endpoint = azure_helper._get_dhcp_endpoint_option_name()
return '\n'.join([
'lease {',
' interface "eth0";',
- ' option unknown-245 {0};'.format(encoded_address),
+ ' option {0} {1};'.format(endpoint, encoded_address),
'}'])
def test_from_dhcp_client(self):
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index e93d28de..e94aad61 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -15,6 +15,16 @@ class TestCloudStackPasswordFetching(TestCase):
mod_name = 'cloudinit.sources.DataSourceCloudStack'
self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
+ default_gw = "192.201.20.0"
+ get_latest_lease = mock.MagicMock(return_value=None)
+ self.patches.enter_context(mock.patch(
+ 'cloudinit.sources.DataSourceCloudStack.get_latest_lease',
+ get_latest_lease))
+
+ get_default_gw = mock.MagicMock(return_value=default_gw)
+ self.patches.enter_context(mock.patch(
+ 'cloudinit.sources.DataSourceCloudStack.get_default_gateway',
+ get_default_gw))
def _set_password_server_response(self, response_string):
subp = mock.MagicMock(return_value=(response_string, ''))
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index a11166a9..e97a679a 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -1,6 +1,8 @@
# Copyright (C) 2014 Neal Shrader
#
# Author: Neal Shrader <neal@digitalocean.com>
+# Author: Ben Howard <bh@digitalocean.com>
+# Author: Scott Moser <smoser@ubuntu.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
@@ -262,6 +264,29 @@ class TestNetworkConvert(TestCase):
print(json.dumps(subn, indent=3))
return subn
+ def test_correct_gateways_defined(self):
+ """test to make sure the eth0 ipv4 and ipv6 gateways are defined"""
+ netcfg = self._get_networking()
+ gateways = []
+ for nic_def in netcfg.get('config'):
+ if nic_def.get('type') != 'physical':
+ continue
+ for subn in nic_def.get('subnets'):
+ if 'gateway' in subn:
+ gateways.append(subn.get('gateway'))
+
+ # we should have two gateways, one ipv4 and ipv6
+ self.assertEqual(len(gateways), 2)
+
+ # make that the ipv6 gateway is there
+ (nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
+ ipv4_def = meta_def.get('ipv4')
+ self.assertIn(ipv4_def.get('gateway'), gateways)
+
+ # make sure the the ipv6 gateway is there
+ ipv6_def = meta_def.get('ipv6')
+ self.assertIn(ipv6_def.get('gateway'), gateways)
+
def test_public_interface_defined(self):
"""test that the public interface is defined as eth0"""
(nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 3eaa58e3..6fd1341d 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -140,7 +140,7 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
def test_instance_level_ssh_keys_are_used(self):
key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+ meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
_set_mock_metadata(meta)
self.ds.get_data()
@@ -150,7 +150,7 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
def test_instance_level_keys_replace_project_level_keys(self):
key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+ meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
_set_mock_metadata(meta)
self.ds.get_data()
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 693882d2..c1911bf4 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -44,7 +44,7 @@ class TestMAASDataSource(TestCase):
# verify that 'userdata' is not returned as part of the metadata
self.assertFalse(('user-data' in md))
- self.assertEqual(vd, None)
+ self.assertIsNone(vd)
def test_seed_dir_valid_extra(self):
"""Verify extra files do not affect seed_dir validity."""
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index bce66125..b0f8e435 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -126,14 +126,14 @@ class TestOpenNebulaDataSource(TestCase):
populate_dir(self.seed_dir, {'context.sh': ''})
results = ds.read_context_disk_dir(self.seed_dir)
- self.assertEqual(results['userdata'], None)
+ self.assertIsNone(results['userdata'])
self.assertEqual(results['metadata'], {})
def test_seed_dir_empty2_context(self):
populate_context_dir(self.seed_dir, {})
results = ds.read_context_disk_dir(self.seed_dir)
- self.assertEqual(results['userdata'], None)
+ self.assertIsNone(results['userdata'])
self.assertEqual(results['metadata'], {})
def test_seed_dir_broken_context(self):
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 7bf55084..c2905d1a 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -242,7 +242,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
self.assertEqual(USER_DATA, ds_os.userdata_raw)
self.assertEqual(2, len(ds_os.files))
self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure)
- self.assertEqual(ds_os.vendordata_raw, None)
+ self.assertIsNone(ds_os.vendordata_raw)
@hp.activate
def test_bad_datasource_meta(self):
@@ -318,7 +318,7 @@ class TestVendorDataLoading(test_helpers.TestCase):
self.assertEqual(self.cvj(data), data)
def test_vd_load_dict_no_ci(self):
- self.assertEqual(self.cvj({'foo': 'bar'}), None)
+ self.assertIsNone(self.cvj({'foo': 'bar'}))
def test_vd_load_dict_ci_dict(self):
self.assertRaises(ValueError, self.cvj,
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 3e09510c..477cf8ed 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -68,6 +68,6 @@ class TestReadOvfEnv(test_helpers.TestCase):
md, ud, cfg = dsovf.read_ovf_environment(env)
self.assertEqual({"instance-id": "inst-001"}, md)
self.assertEqual({'password': "passw0rd"}, cfg)
- self.assertEqual(None, ud)
+ self.assertIsNone(ud)
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index 88370669..be9a8318 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -127,7 +127,7 @@ network:
ethernets:
eth0:
addresses:
- - 192.168.1.5/255.255.255.0
+ - 192.168.1.5/24
gateway4: 192.168.1.254
eth1:
dhcp4: true
@@ -178,6 +178,20 @@ class WriteBuffer(object):
class TestNetCfgDistro(TestCase):
+ frbsd_ifout = """\
+hn0: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> metric 0 mtu 1500
+ options=51b<RXCSUM,TXCSUM,VLAN_MTU,VLAN_HWTAGGING,TSO4,LRO>
+ ether 00:15:5d:4c:73:00
+ inet6 fe80::215:5dff:fe4c:7300%hn0 prefixlen 64 scopeid 0x2
+ inet 10.156.76.127 netmask 0xfffffc00 broadcast 10.156.79.255
+ nd6 options=23<PERFORMNUD,ACCEPT_RTADV,AUTO_LINKLOCAL>
+ media: Ethernet autoselect (10Gbase-T <full-duplex>)
+ status: active
+"""
+
+ def setUp(self):
+ super(TestNetCfgDistro, self).setUp()
+
def _get_distro(self, dname, renderers=None):
cls = distros.fetch(dname)
cfg = settings.CFG_BUILTIN
@@ -251,6 +265,7 @@ class TestNetCfgDistro(TestCase):
def test_apply_network_config_v1_to_netplan_ub(self):
renderers = ['netplan']
+ devlist = ['eth0', 'lo']
ub_distro = self._get_distro('ubuntu', renderers=renderers)
with ExitStack() as mocks:
write_bufs = {}
@@ -272,6 +287,9 @@ class TestNetCfgDistro(TestCase):
mock.patch.object(util, 'subp', return_value=(0, 0)))
mocks.enter_context(
mock.patch.object(os.path, 'isfile', return_value=False))
+ mocks.enter_context(
+ mock.patch("cloudinit.net.netplan.get_devicelist",
+ return_value=devlist))
ub_distro.apply_network_config(V1_NET_CFG, False)
@@ -285,6 +303,7 @@ class TestNetCfgDistro(TestCase):
def test_apply_network_config_v2_passthrough_ub(self):
renderers = ['netplan']
+ devlist = ['eth0', 'lo']
ub_distro = self._get_distro('ubuntu', renderers=renderers)
with ExitStack() as mocks:
write_bufs = {}
@@ -306,7 +325,10 @@ class TestNetCfgDistro(TestCase):
mock.patch.object(util, 'subp', return_value=(0, 0)))
mocks.enter_context(
mock.patch.object(os.path, 'isfile', return_value=False))
-
+ # FreeBSD does not have '/sys/class/net' file,
+ # so we need mock here.
+ mocks.enter_context(
+ mock.patch.object(os, 'listdir', return_value=devlist))
ub_distro.apply_network_config(V2_NET_CFG, False)
self.assertEqual(len(write_bufs), 1)
@@ -328,6 +350,29 @@ class TestNetCfgDistro(TestCase):
for (k, v) in b1.items():
self.assertEqual(v, b2[k])
+ @mock.patch('cloudinit.distros.freebsd.Distro.get_ifconfig_list')
+ @mock.patch('cloudinit.distros.freebsd.Distro.get_ifconfig_ifname_out')
+ def test_get_ip_nic_freebsd(self, ifname_out, iflist):
+ frbsd_distro = self._get_distro('freebsd')
+ iflist.return_value = "lo0 hn0"
+ ifname_out.return_value = self.frbsd_ifout
+ res = frbsd_distro.get_ipv4()
+ self.assertEqual(res, ['lo0', 'hn0'])
+ res = frbsd_distro.get_ipv6()
+ self.assertEqual(res, [])
+
+ @mock.patch('cloudinit.distros.freebsd.Distro.get_ifconfig_ether')
+ @mock.patch('cloudinit.distros.freebsd.Distro.get_ifconfig_ifname_out')
+ @mock.patch('cloudinit.distros.freebsd.Distro.get_interface_mac')
+ def test_generate_fallback_config_freebsd(self, mac, ifname_out, if_ether):
+ frbsd_distro = self._get_distro('freebsd')
+
+ if_ether.return_value = 'hn0'
+ ifname_out.return_value = self.frbsd_ifout
+ mac.return_value = '00:15:5d:4c:73:00'
+ res = frbsd_distro.generate_fallback_config()
+ self.assertIsNotNone(res)
+
def test_simple_write_rh(self):
rh_distro = self._get_distro('rhel')
@@ -431,7 +476,7 @@ NETWORKING=yes
expected_buf = '''
# Created by cloud-init on instance boot automatically, do not edit.
#
-BOOTPROTO=static
+BOOTPROTO=none
DEVICE=eth0
IPADDR=192.168.1.5
NETMASK=255.255.255.0
@@ -488,7 +533,6 @@ NETWORKING=yes
mock.patch.object(util, 'load_file', return_value=''))
mocks.enter_context(
mock.patch.object(os.path, 'isfile', return_value=False))
-
rh_distro.apply_network(BASE_NET_CFG_IPV6, False)
self.assertEqual(len(write_bufs), 4)
@@ -581,11 +625,10 @@ IPV6_AUTOCONF=no
expected_buf = '''
# Created by cloud-init on instance boot automatically, do not edit.
#
-BOOTPROTO=static
+BOOTPROTO=none
DEVICE=eth0
-IPV6ADDR=2607:f0d0:1002:0011::2
+IPV6ADDR=2607:f0d0:1002:0011::2/64
IPV6INIT=yes
-NETMASK=64
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py
index c9d03475..97168cf9 100644
--- a/tests/unittests/test_distros/test_resolv.py
+++ b/tests/unittests/test_distros/test_resolv.py
@@ -30,7 +30,7 @@ class TestResolvHelper(TestCase):
def test_local_domain(self):
rp = resolv_conf.ResolvConf(BASE_RESOLVE)
- self.assertEqual(None, rp.local_domain)
+ self.assertIsNone(rp.local_domain)
rp.local_domain = "bob"
self.assertEqual('bob', rp.local_domain)
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
new file mode 100644
index 00000000..f5694b26
--- /dev/null
+++ b/tests/unittests/test_ds_identify.py
@@ -0,0 +1,300 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import os
+from uuid import uuid4
+
+from cloudinit import safeyaml
+from cloudinit import util
+from .helpers import CiTestCase, dir2dict, json_dumps, populate_dir
+
+UNAME_MYSYS = ("Linux bart 4.4.0-62-generic #83-Ubuntu "
+ "SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux")
+BLKID_EFI_ROOT = """
+DEVNAME=/dev/sda1
+UUID=8B36-5390
+TYPE=vfat
+PARTUUID=30d7c715-a6ae-46ee-b050-afc6467fc452
+
+DEVNAME=/dev/sda2
+UUID=19ac97d5-6973-4193-9a09-2e6bbfa38262
+TYPE=ext4
+PARTUUID=30c65c77-e07d-4039-b2fb-88b1fb5fa1fc
+"""
+
+DI_DEFAULT_POLICY = "search,found=all,maybe=all,notfound=enabled"
+DI_DEFAULT_POLICY_NO_DMI = "search,found=all,maybe=all,notfound=disabled"
+
+SHELL_MOCK_TMPL = """\
+%(name)s() {
+ local out='%(out)s' err='%(err)s' r='%(ret)s' RET='%(RET)s'
+ [ "$out" = "_unset" ] || echo "$out"
+ [ "$err" = "_unset" ] || echo "$err" 2>&1
+ [ "$RET" = "_unset" ] || _RET="$RET"
+ return $r
+}
+"""
+
+RC_FOUND = 0
+RC_NOT_FOUND = 1
+DS_NONE = 'None'
+
+P_PRODUCT_NAME = "sys/class/dmi/id/product_name"
+P_PRODUCT_SERIAL = "sys/class/dmi/id/product_serial"
+P_PRODUCT_UUID = "sys/class/dmi/id/product_uuid"
+P_DSID_CFG = "etc/cloud/ds-identify.cfg"
+
+MOCK_VIRT_IS_KVM = {'name': 'detect_virt', 'RET': 'kvm', 'ret': 0}
+
+
+class TestDsIdentify(CiTestCase):
+ dsid_path = os.path.realpath('tools/ds-identify')
+
+ def call(self, rootd=None, mocks=None, args=None, files=None,
+ policy_dmi=DI_DEFAULT_POLICY,
+ policy_nodmi=DI_DEFAULT_POLICY_NO_DMI):
+ if args is None:
+ args = []
+ if mocks is None:
+ mocks = []
+
+ if files is None:
+ files = {}
+
+ if rootd is None:
+ rootd = self.tmp_dir()
+
+ unset = '_unset'
+ wrap = self.tmp_path(path="_shwrap", dir=rootd)
+ populate_dir(rootd, files)
+
+ # DI_DEFAULT_POLICY* are declared always as to not rely
+ # on the default in the code. This is because SRU releases change
+ # the value in the code, and thus tests would fail there.
+ head = [
+ "DI_MAIN=noop",
+ "DEBUG_LEVEL=2",
+ "DI_LOG=stderr",
+ "PATH_ROOT='%s'" % rootd,
+ ". " + self.dsid_path,
+ 'DI_DEFAULT_POLICY="%s"' % policy_dmi,
+ 'DI_DEFAULT_POLICY_NO_DMI="%s"' % policy_nodmi,
+ ""
+ ]
+
+ def write_mock(data):
+ ddata = {'out': None, 'err': None, 'ret': 0, 'RET': None}
+ ddata.update(data)
+ for k in ddata:
+ if ddata[k] is None:
+ ddata[k] = unset
+ return SHELL_MOCK_TMPL % ddata
+
+ mocklines = []
+ defaults = [
+ {'name': 'detect_virt', 'RET': 'none', 'ret': 1},
+ {'name': 'uname', 'out': UNAME_MYSYS},
+ {'name': 'blkid', 'out': BLKID_EFI_ROOT},
+ ]
+
+ written = [d['name'] for d in mocks]
+ for data in mocks:
+ mocklines.append(write_mock(data))
+ for d in defaults:
+ if d['name'] not in written:
+ mocklines.append(write_mock(d))
+
+ endlines = [
+ 'main %s' % ' '.join(['"%s"' % s for s in args])
+ ]
+
+ with open(wrap, "w") as fp:
+ fp.write('\n'.join(head + mocklines + endlines) + "\n")
+
+ rc = 0
+ try:
+ out, err = util.subp(['sh', '-c', '. %s' % wrap], capture=True)
+ except util.ProcessExecutionError as e:
+ rc = e.exit_code
+ out = e.stdout
+ err = e.stderr
+
+ cfg = None
+ cfg_out = os.path.join(rootd, 'run/cloud-init/cloud.cfg')
+ if os.path.exists(cfg_out):
+ contents = util.load_file(cfg_out)
+ try:
+ cfg = safeyaml.load(contents)
+ except Exception as e:
+ cfg = {"_INVALID_YAML": contents,
+ "_EXCEPTION": str(e)}
+
+ return rc, out, err, cfg, dir2dict(rootd)
+
+ def _call_via_dict(self, data, rootd=None, **kwargs):
+ # return output of self.call with a dict input like VALID_CFG[item]
+ xwargs = {'rootd': rootd}
+ for k in ('mocks', 'args', 'policy_dmi', 'policy_nodmi', 'files'):
+ if k in data:
+ xwargs[k] = data[k]
+ if k in kwargs:
+ xwargs[k] = kwargs[k]
+
+ return self.call(**xwargs)
+
+ def _test_ds_found(self, name):
+ data = copy.deepcopy(VALID_CFG[name])
+ return self._check_via_dict(
+ data, RC_FOUND, dslist=[data.get('ds'), DS_NONE])
+
+ def _check_via_dict(self, data, rc, dslist=None, **kwargs):
+ found_rc, out, err, cfg, files = self._call_via_dict(data, **kwargs)
+ good = False
+ try:
+ self.assertEqual(rc, found_rc)
+ if dslist is not None:
+ self.assertEqual(dslist, cfg['datasource_list'])
+ good = True
+ finally:
+ if not good:
+ _print_run_output(rc, out, err, cfg, files)
+ return rc, out, err, cfg, files
+
+ def test_aws_ec2_hvm(self):
+ """EC2: hvm instances use dmi serial and uuid starting with 'ec2'."""
+ self._test_ds_found('Ec2-hvm')
+
+ def test_aws_ec2_xen(self):
+ """EC2: sys/hypervisor/uuid starts with ec2."""
+ self._test_ds_found('Ec2-xen')
+
+ def test_brightbox_is_ec2(self):
+ """EC2: product_serial ends with 'brightbox.com'"""
+ self._test_ds_found('Ec2-brightbox')
+
+ def test_gce_by_product_name(self):
+ """GCE identifies itself with product_name."""
+ self._test_ds_found('GCE')
+
+ def test_gce_by_serial(self):
+ """Older gce compute instances must be identified by serial."""
+ self._test_ds_found('GCE-serial')
+
+ def test_config_drive(self):
+ """ConfigDrive datasource has a disk with LABEL=config-2."""
+ self._test_ds_found('ConfigDrive')
+ return
+
+ def test_policy_disabled(self):
+ """A Builtin policy of 'disabled' should return not found.
+
+ Even though a search would find something, the builtin policy of
+ disabled should cause the return of not found."""
+ mydata = copy.deepcopy(VALID_CFG['Ec2-hvm'])
+ self._check_via_dict(mydata, rc=RC_NOT_FOUND, policy_dmi="disabled")
+
+ def test_policy_config_disable_overrides_builtin(self):
+ """explicit policy: disabled in config file should cause not found."""
+ mydata = copy.deepcopy(VALID_CFG['Ec2-hvm'])
+ mydata['files'][P_DSID_CFG] = '\n'.join(['policy: disabled', ''])
+ self._check_via_dict(mydata, rc=RC_NOT_FOUND)
+
+ def test_single_entry_defines_datasource(self):
+ """If config has a single entry in datasource_list, that is used.
+
+ Test the valid Ec2-hvm, but provide a config file that specifies
+ a single entry in datasource_list. The configured value should
+ be used."""
+ mydata = copy.deepcopy(VALID_CFG['Ec2-hvm'])
+ cfgpath = 'etc/cloud/cloud.cfg.d/myds.cfg'
+ mydata['files'][cfgpath] = 'datasource_list: ["NoCloud"]\n'
+ self._check_via_dict(mydata, rc=RC_FOUND, dslist=['NoCloud', DS_NONE])
+
+ def test_configured_list_with_none(self):
+ """When datasource_list already contains None, None is not added.
+
+ The explicitly configured datasource_list has 'None' in it. That
+ should not have None automatically added."""
+ mydata = copy.deepcopy(VALID_CFG['GCE'])
+ cfgpath = 'etc/cloud/cloud.cfg.d/myds.cfg'
+ mydata['files'][cfgpath] = 'datasource_list: ["Ec2", "None"]\n'
+ self._check_via_dict(mydata, rc=RC_FOUND, dslist=['Ec2', DS_NONE])
+
+
+def blkid_out(disks=None):
+ """Convert a list of disk dictionaries into blkid content."""
+ if disks is None:
+ disks = []
+ lines = []
+ for disk in disks:
+ if not disk["DEVNAME"].startswith("/dev/"):
+ disk["DEVNAME"] = "/dev/" + disk["DEVNAME"]
+ for key in disk:
+ lines.append("%s=%s" % (key, disk[key]))
+ lines.append("")
+ return '\n'.join(lines)
+
+
+def _print_run_output(rc, out, err, cfg, files):
+ """A helper to print return of TestDsIdentify.
+
+ _print_run_output(self.call())"""
+ print('\n'.join([
+ '-- rc = %s --' % rc,
+ '-- out --', str(out),
+ '-- err --', str(err),
+ '-- cfg --', json_dumps(cfg)]))
+ print('-- files --')
+ for k, v in files.items():
+ if "/_shwrap" in k:
+ continue
+ print(' === %s ===' % k)
+ for line in v.splitlines():
+ print(" " + line)
+
+
+VALID_CFG = {
+ 'Ec2-hvm': {
+ 'ds': 'Ec2',
+ 'mocks': [{'name': 'detect_virt', 'RET': 'kvm', 'ret': 0}],
+ 'files': {
+ P_PRODUCT_SERIAL: 'ec23aef5-54be-4843-8d24-8c819f88453e\n',
+ P_PRODUCT_UUID: 'EC23AEF5-54BE-4843-8D24-8C819F88453E\n',
+ }
+ },
+ 'Ec2-xen': {
+ 'ds': 'Ec2',
+ 'mocks': [{'name': 'detect_virt', 'RET': 'xen', 'ret': 0}],
+ 'files': {
+ 'sys/hypervisor/uuid': 'ec2c6e2f-5fac-4fc7-9c82-74127ec14bbb\n'
+ },
+ },
+ 'Ec2-brightbox': {
+ 'ds': 'Ec2',
+ 'files': {P_PRODUCT_SERIAL: 'facc6e2f.brightbox.com\n'},
+ },
+ 'GCE': {
+ 'ds': 'GCE',
+ 'files': {P_PRODUCT_NAME: 'Google Compute Engine\n'},
+ 'mocks': [MOCK_VIRT_IS_KVM],
+ },
+ 'GCE-serial': {
+ 'ds': 'GCE',
+ 'files': {P_PRODUCT_SERIAL: 'GoogleCloud-8f2e88f\n'},
+ 'mocks': [MOCK_VIRT_IS_KVM],
+ },
+ 'ConfigDrive': {
+ 'ds': 'ConfigDrive',
+ 'mocks': [
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ [{'DEVNAME': 'vda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'vda2', 'TYPE': 'ext4',
+ 'LABEL': 'cloudimg-rootfs', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'vdb', 'TYPE': 'vfat', 'LABEL': 'config-2'}])
+ },
+ ],
+ },
+}
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
index 24e45233..1ca915b4 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
@@ -121,39 +121,82 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
myds.metadata.update(metadata)
return cloud.Cloud(myds, paths, {}, mydist, None)
- def _apt_source_list(self, cfg, expected, distro):
- "_apt_source_list - Test rendering from template (generic)"
-
+ def _apt_source_list(self, distro, cfg, cfg_on_empty=False):
+ """_apt_source_list - Test rendering from template (generic)"""
# entry at top level now, wrap in 'apt' key
cfg = {'apt': cfg}
mycloud = self._get_cloud(distro)
- with mock.patch.object(util, 'write_file') as mockwf:
+
+ with mock.patch.object(util, 'write_file') as mock_writefile:
with mock.patch.object(util, 'load_file',
- return_value=MOCKED_APT_SRC_LIST) as mocklf:
+ return_value=MOCKED_APT_SRC_LIST
+ ) as mock_loadfile:
with mock.patch.object(os.path, 'isfile',
- return_value=True) as mockisfile:
- with mock.patch.object(util, 'rename'):
- cc_apt_configure.handle("test", cfg, mycloud,
- LOG, None)
-
- # check if it would have loaded the distro template
- mockisfile.assert_any_call(
- ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
- mocklf.assert_any_call(
- ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
- # check expected content in result
- mockwf.assert_called_once_with('/etc/apt/sources.list', expected,
- mode=0o644)
+ return_value=True) as mock_isfile:
+ cfg_func = ('cloudinit.config.cc_apt_configure.' +
+ '_should_configure_on_empty_apt')
+ with mock.patch(cfg_func,
+ return_value=(cfg_on_empty, "test")
+ ) as mock_shouldcfg:
+ cc_apt_configure.handle("test", cfg, mycloud, LOG,
+ None)
+
+ return mock_writefile, mock_loadfile, mock_isfile, mock_shouldcfg
def test_apt_v3_source_list_debian(self):
"""test_apt_v3_source_list_debian - without custom sources or parms"""
cfg = {}
- self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'debian')
+ distro = 'debian'
+ expected = EXPECTED_BASE_CONTENT
+
+ mock_writefile, mock_load_file, mock_isfile, mock_shouldcfg = (
+ self._apt_source_list(distro, cfg, cfg_on_empty=True))
+
+ template = '/etc/cloud/templates/sources.list.%s.tmpl' % distro
+ mock_writefile.assert_called_once_with('/etc/apt/sources.list',
+ expected, mode=0o644)
+ mock_load_file.assert_called_with(template)
+ mock_isfile.assert_any_call(template)
+ self.assertEqual(1, mock_shouldcfg.call_count)
def test_apt_v3_source_list_ubuntu(self):
"""test_apt_v3_source_list_ubuntu - without custom sources or parms"""
cfg = {}
- self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'ubuntu')
+ distro = 'ubuntu'
+ expected = EXPECTED_BASE_CONTENT
+
+ mock_writefile, mock_load_file, mock_isfile, mock_shouldcfg = (
+ self._apt_source_list(distro, cfg, cfg_on_empty=True))
+
+ template = '/etc/cloud/templates/sources.list.%s.tmpl' % distro
+ mock_writefile.assert_called_once_with('/etc/apt/sources.list',
+ expected, mode=0o644)
+ mock_load_file.assert_called_with(template)
+ mock_isfile.assert_any_call(template)
+ self.assertEqual(1, mock_shouldcfg.call_count)
+
+ def test_apt_v3_source_list_ubuntu_snappy(self):
+ """test_apt_v3_source_list_ubuntu_snappy - without custom sources or
+ parms"""
+ cfg = {'apt': {}}
+ mycloud = self._get_cloud('ubuntu')
+
+ with mock.patch.object(util, 'write_file') as mock_writefile:
+ with mock.patch.object(util, 'system_is_snappy',
+ return_value=True) as mock_issnappy:
+ cc_apt_configure.handle("test", cfg, mycloud, LOG, None)
+
+ self.assertEqual(0, mock_writefile.call_count)
+ self.assertEqual(1, mock_issnappy.call_count)
+
+ def test_apt_v3_source_list_centos(self):
+ """test_apt_v3_source_list_centos - without custom sources or parms"""
+ cfg = {}
+ distro = 'rhel'
+
+ mock_writefile, _, _, _ = self._apt_source_list(distro, cfg)
+
+ self.assertEqual(0, mock_writefile.call_count)
def test_apt_v3_source_list_psm(self):
"""test_apt_v3_source_list_psm - Test specifying prim+sec mirrors"""
@@ -164,8 +207,17 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
'uri': pm}],
'security': [{'arches': ["default"],
'uri': sm}]}
+ distro = 'ubuntu'
+ expected = EXPECTED_PRIMSEC_CONTENT
+
+ mock_writefile, mock_load_file, mock_isfile, _ = (
+ self._apt_source_list(distro, cfg, cfg_on_empty=True))
- self._apt_source_list(cfg, EXPECTED_PRIMSEC_CONTENT, 'ubuntu')
+ template = '/etc/cloud/templates/sources.list.%s.tmpl' % distro
+ mock_writefile.assert_called_once_with('/etc/apt/sources.list',
+ expected, mode=0o644)
+ mock_load_file.assert_called_with(template)
+ mock_isfile.assert_any_call(template)
def test_apt_v3_srcl_custom(self):
"""test_apt_v3_srcl_custom - Test rendering a custom source template"""
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
index 7ff39225..916a0d7a 100644
--- a/tests/unittests/test_handler/test_handler_disk_setup.py
+++ b/tests/unittests/test_handler/test_handler_disk_setup.py
@@ -17,6 +17,10 @@ class TestIsDiskUsed(TestCase):
self.check_fs = self.patches.enter_context(
mock.patch('{0}.check_fs'.format(mod_name)))
+ def tearDown(self):
+ super(TestIsDiskUsed, self).tearDown()
+ self.patches.close()
+
def test_multiple_child_nodes_returns_true(self):
self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
@@ -62,7 +66,7 @@ class TestGetMbrHddSize(TestCase):
size_in_sectors = size_in_bytes / sector_size
self._configure_subp_mock(size_in_bytes, sector_size)
self.assertEqual(size_in_sectors,
- cc_disk_setup.get_mbr_hdd_size('/dev/sda1'))
+ cc_disk_setup.get_hdd_size('/dev/sda1'))
def test_size_for_512_byte_sectors(self):
self._test_for_sector_size(512)
@@ -147,4 +151,75 @@ class TestUpdateFsSetupDevices(TestCase):
'filesystem': 'xfs'
}, fs_setup)
+ def test_dotted_devname_populates_partition(self):
+ fs_setup = {
+ 'device': 'ephemeral0.1',
+ 'label': 'test2',
+ 'filesystem': 'xfs'
+ }
+ cc_disk_setup.update_fs_setup_devices([fs_setup],
+ lambda device: device)
+ self.assertEqual({
+ '_origname': 'ephemeral0.1',
+ 'device': 'ephemeral0',
+ 'partition': '1',
+ 'label': 'test2',
+ 'filesystem': 'xfs'
+ }, fs_setup)
+
+
+@mock.patch('cloudinit.config.cc_disk_setup.assert_and_settle_device',
+ return_value=None)
+@mock.patch('cloudinit.config.cc_disk_setup.find_device_node',
+ return_value=('/dev/xdb1', False))
+@mock.patch('cloudinit.config.cc_disk_setup.device_type', return_value=None)
+@mock.patch('cloudinit.config.cc_disk_setup.util.subp', return_value=('', ''))
+class TestMkfsCommandHandling(TestCase):
+
+ def test_with_cmd(self, subp, *args):
+ """mkfs honors cmd and logs warnings when extra_opts or overwrite are
+ provided."""
+ with self.assertLogs(
+ 'cloudinit.config.cc_disk_setup') as logs:
+ cc_disk_setup.mkfs({
+ 'cmd': 'mkfs -t %(filesystem)s -L %(label)s %(device)s',
+ 'filesystem': 'ext4',
+ 'device': '/dev/xdb1',
+ 'label': 'with_cmd',
+ 'extra_opts': ['should', 'generate', 'warning'],
+ 'overwrite': 'should generate warning too'
+ })
+
+ self.assertIn(
+ 'WARNING:cloudinit.config.cc_disk_setup:fs_setup:extra_opts ' +
+ 'ignored because cmd was specified: mkfs -t ext4 -L with_cmd ' +
+ '/dev/xdb1',
+ logs.output)
+ self.assertIn(
+ 'WARNING:cloudinit.config.cc_disk_setup:fs_setup:overwrite ' +
+ 'ignored because cmd was specified: mkfs -t ext4 -L with_cmd ' +
+ '/dev/xdb1',
+ logs.output)
+
+ subp.assert_called_once_with(
+ 'mkfs -t ext4 -L with_cmd /dev/xdb1', shell=True)
+
+ @mock.patch('cloudinit.config.cc_disk_setup.util.which')
+ def test_overwrite_and_extra_opts_without_cmd(self, m_which, subp, *args):
+ """mkfs observes extra_opts and overwrite settings when cmd is not
+ present."""
+ m_which.side_effect = lambda p: {'mkfs.ext4': '/sbin/mkfs.ext4'}[p]
+ cc_disk_setup.mkfs({
+ 'filesystem': 'ext4',
+ 'device': '/dev/xdb1',
+ 'label': 'without_cmd',
+ 'extra_opts': ['are', 'added'],
+ 'overwrite': True
+ })
+
+ subp.assert_called_once_with(
+ ['/sbin/mkfs.ext4', '/dev/xdb1',
+ '-L', 'without_cmd', '-F', 'are', 'added'],
+ shell=False)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
index ec600077..bc4277b7 100644
--- a/tests/unittests/test_handler/test_handler_ntp.py
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -2,277 +2,214 @@
from cloudinit.config import cc_ntp
from cloudinit.sources import DataSourceNone
-from cloudinit import templater
from cloudinit import (distros, helpers, cloud, util)
from ..helpers import FilesystemMockingTestCase, mock
-import logging
+
import os
+from os.path import dirname
import shutil
-import tempfile
-
-LOG = logging.getLogger(__name__)
-NTP_TEMPLATE = """
+NTP_TEMPLATE = b"""\
## template: jinja
-
-{% if pools %}# pools
-{% endif %}
-{% for pool in pools -%}
-pool {{pool}} iburst
-{% endfor %}
-{%- if servers %}# servers
-{% endif %}
-{% for server in servers -%}
-server {{server}} iburst
-{% endfor %}
-
-"""
-
-
-NTP_EXPECTED_UBUNTU = """
-# pools
-pool 0.mycompany.pool.ntp.org iburst
-# servers
-server 192.168.23.3 iburst
-
+servers {{servers}}
+pools {{pools}}
"""
class TestNtp(FilesystemMockingTestCase):
+ with_logs = True
+
def setUp(self):
super(TestNtp, self).setUp()
self.subp = util.subp
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
+ self.new_root = self.tmp_dir()
- def _get_cloud(self, distro, metadata=None):
+ def _get_cloud(self, distro):
self.patchUtils(self.new_root)
- paths = helpers.Paths({})
+ paths = helpers.Paths({'templates_dir': self.new_root})
cls = distros.fetch(distro)
mydist = cls(distro, {}, paths)
myds = DataSourceNone.DataSourceNone({}, mydist, paths)
- if metadata:
- myds.metadata.update(metadata)
return cloud.Cloud(myds, paths, {}, mydist, None)
@mock.patch("cloudinit.config.cc_ntp.util")
def test_ntp_install(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- cc.distro.name = 'ubuntu'
- mock_util.which.return_value = None
+ """ntp_install installs via install_func when check_exe is absent."""
+ mock_util.which.return_value = None # check_exe not found.
install_func = mock.MagicMock()
-
cc_ntp.install_ntp(install_func, packages=['ntpx'], check_exe='ntpdx')
- self.assertTrue(install_func.called)
mock_util.which.assert_called_with('ntpdx')
- install_pkg = install_func.call_args_list[0][0][0]
- self.assertEqual(sorted(install_pkg), ['ntpx'])
+ install_func.assert_called_once_with(['ntpx'])
@mock.patch("cloudinit.config.cc_ntp.util")
def test_ntp_install_not_needed(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- cc.distro.name = 'ubuntu'
- mock_util.which.return_value = ["/usr/sbin/ntpd"]
- cc_ntp.install_ntp(cc)
- self.assertFalse(cc.distro.install_packages.called)
+ """ntp_install doesn't attempt install when check_exe is found."""
+ mock_util.which.return_value = ["/usr/sbin/ntpd"] # check_exe found.
+ install_func = mock.MagicMock()
+ cc_ntp.install_ntp(install_func, packages=['ntp'], check_exe='ntpd')
+ install_func.assert_not_called()
def test_ntp_rename_ntp_conf(self):
- with mock.patch.object(os.path, 'exists',
- return_value=True) as mockpath:
- with mock.patch.object(util, 'rename') as mockrename:
- cc_ntp.rename_ntp_conf()
-
- mockpath.assert_called_with('/etc/ntp.conf')
- mockrename.assert_called_with('/etc/ntp.conf', '/etc/ntp.conf.dist')
+ """When NTP_CONF exists, rename_ntp moves it."""
+ ntpconf = self.tmp_path("ntp.conf", self.new_root)
+ os.mknod(ntpconf)
+ with mock.patch("cloudinit.config.cc_ntp.NTP_CONF", ntpconf):
+ cc_ntp.rename_ntp_conf()
+ self.assertFalse(os.path.exists(ntpconf))
+ self.assertTrue(os.path.exists("{0}.dist".format(ntpconf)))
def test_ntp_rename_ntp_conf_skip_missing(self):
- with mock.patch.object(os.path, 'exists',
- return_value=False) as mockpath:
- with mock.patch.object(util, 'rename') as mockrename:
- cc_ntp.rename_ntp_conf()
-
- mockpath.assert_called_with('/etc/ntp.conf')
- mockrename.assert_not_called()
-
- def ntp_conf_render(self, distro):
- """ntp_conf_render
- Test rendering of a ntp.conf from template for a given distro
+ """When NTP_CONF doesn't exist rename_ntp doesn't create a file."""
+ ntpconf = self.tmp_path("ntp.conf", self.new_root)
+ self.assertFalse(os.path.exists(ntpconf))
+ with mock.patch("cloudinit.config.cc_ntp.NTP_CONF", ntpconf):
+ cc_ntp.rename_ntp_conf()
+ self.assertFalse(os.path.exists("{0}.dist".format(ntpconf)))
+ self.assertFalse(os.path.exists(ntpconf))
+
+ def test_write_ntp_config_template_from_ntp_conf_tmpl_with_servers(self):
+ """write_ntp_config_template reads content from ntp.conf.tmpl.
+
+ It reads ntp.conf.tmpl if present and renders the value from servers
+ key. When no pools key is defined, template is rendered using an empty
+ list for pools.
"""
-
- cfg = {'ntp': {}}
- mycloud = self._get_cloud(distro)
- distro_names = cc_ntp.generate_server_names(distro)
-
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile', return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.write_ntp_config_template(cfg, mycloud)
-
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
- '/etc/ntp.conf',
- {'servers': [], 'pools': distro_names})
-
- def test_ntp_conf_render_rhel(self):
- """Test templater.render_to_file() for rhel"""
- self.ntp_conf_render('rhel')
-
- def test_ntp_conf_render_debian(self):
- """Test templater.render_to_file() for debian"""
- self.ntp_conf_render('debian')
-
- def test_ntp_conf_render_fedora(self):
- """Test templater.render_to_file() for fedora"""
- self.ntp_conf_render('fedora')
-
- def test_ntp_conf_render_sles(self):
- """Test templater.render_to_file() for sles"""
- self.ntp_conf_render('sles')
-
- def test_ntp_conf_render_ubuntu(self):
- """Test templater.render_to_file() for ubuntu"""
- self.ntp_conf_render('ubuntu')
-
- def test_ntp_conf_servers_no_pools(self):
distro = 'ubuntu'
- pools = []
- servers = ['192.168.2.1']
cfg = {
- 'ntp': {
- 'pools': pools,
- 'servers': servers,
- }
+ 'servers': ['192.168.2.1', '192.168.2.2']
}
mycloud = self._get_cloud(distro)
-
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile', return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
-
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
- '/etc/ntp.conf',
- {'servers': servers, 'pools': pools})
-
- def test_ntp_conf_custom_pools_no_server(self):
+ ntp_conf = self.tmp_path("ntp.conf", self.new_root) # Doesn't exist
+ # Create ntp.conf.tmpl
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.write_ntp_config_template(cfg, mycloud)
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ self.assertEqual(
+ "servers ['192.168.2.1', '192.168.2.2']\npools []\n",
+ content.decode())
+
+ def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers(self):
+ """write_ntp_config_template reads content from ntp.conf.distro.tmpl.
+
+ It reads ntp.conf.<distro>.tmpl before attempting ntp.conf.tmpl. It
+ renders the value from the keys servers and pools. When no
+ servers value is present, template is rendered using an empty list.
+ """
distro = 'ubuntu'
- pools = ['0.mycompany.pool.ntp.org']
- servers = []
cfg = {
- 'ntp': {
- 'pools': pools,
- 'servers': servers,
- }
+ 'pools': ['10.0.0.1', '10.0.0.2']
}
mycloud = self._get_cloud(distro)
-
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile', return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
-
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
- '/etc/ntp.conf',
- {'servers': servers, 'pools': pools})
-
- def test_ntp_conf_custom_pools_and_server(self):
+ ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ # Create ntp.conf.tmpl which isn't read
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(b'NOT READ: ntp.conf.<distro>.tmpl is primary')
+ # Create ntp.conf.tmpl.<distro>
+ with open('{0}.{1}.tmpl'.format(ntp_conf, distro), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.write_ntp_config_template(cfg, mycloud)
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ self.assertEqual(
+ "servers []\npools ['10.0.0.1', '10.0.0.2']\n",
+ content.decode())
+
+ def test_write_ntp_config_template_defaults_pools_when_empty_lists(self):
+ """write_ntp_config_template defaults pools servers upon empty config.
+
+ When both pools and servers are empty, default NR_POOL_SERVERS get
+ configured.
+ """
distro = 'ubuntu'
- pools = ['0.mycompany.pool.ntp.org']
- servers = ['192.168.23.3']
- cfg = {
- 'ntp': {
- 'pools': pools,
- 'servers': servers,
- }
- }
mycloud = self._get_cloud(distro)
-
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile', return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
-
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
- '/etc/ntp.conf',
- {'servers': servers, 'pools': pools})
-
- def test_ntp_conf_contents_match(self):
- """Test rendered contents of /etc/ntp.conf for ubuntu"""
- pools = ['0.mycompany.pool.ntp.org']
- servers = ['192.168.23.3']
+ ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ # Create ntp.conf.tmpl
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.write_ntp_config_template({}, mycloud)
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ default_pools = [
+ "{0}.{1}.pool.ntp.org".format(x, distro)
+ for x in range(0, cc_ntp.NR_POOL_SERVERS)]
+ self.assertEqual(
+ "servers []\npools {0}\n".format(default_pools),
+ content.decode())
+ self.assertIn(
+ "Adding distro default ntp pool servers: {0}".format(
+ ",".join(default_pools)),
+ self.logs.getvalue())
+
+ def test_ntp_handler_mocked_template(self):
+ """Test ntp handler renders ubuntu ntp.conf template."""
+ pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3', '192.168.23.4']
cfg = {
'ntp': {
'pools': pools,
- 'servers': servers,
+ 'servers': servers
}
}
mycloud = self._get_cloud('ubuntu')
- side_effect = [NTP_TEMPLATE.lstrip()]
-
- # work backwards from util.write_file and mock out call path
- # write_ntp_config_template()
- # cloud.get_template_filename()
- # os.path.isfile()
- # templater.render_to_file()
- # templater.render_from_file()
- # util.load_file()
- # util.write_file()
- #
- with mock.patch.object(util, 'write_file') as mockwrite:
- with mock.patch.object(util, 'load_file', side_effect=side_effect):
- with mock.patch.object(os.path, 'isfile', return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.write_ntp_config_template(cfg.get('ntp'),
- mycloud)
-
- mockwrite.assert_called_once_with(
- '/etc/ntp.conf',
- NTP_EXPECTED_UBUNTU,
- mode=420)
-
- def test_ntp_handler(self):
- """Test ntp handler renders ubuntu ntp.conf template"""
- pools = ['0.mycompany.pool.ntp.org']
- servers = ['192.168.23.3']
+ ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ # Create ntp.conf.tmpl
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ with mock.patch.object(util, 'which', return_value=None):
+ cc_ntp.handle('notimportant', cfg, mycloud, None, None)
+
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ self.assertEqual(
+ 'servers {0}\npools {1}\n'.format(servers, pools),
+ content.decode())
+
+ def test_ntp_handler_real_distro_templates(self):
+ """Test ntp handler renders the shipped distro ntp.conf templates."""
+ pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3', '192.168.23.4']
cfg = {
'ntp': {
'pools': pools,
- 'servers': servers,
+ 'servers': servers
}
}
- mycloud = self._get_cloud('ubuntu')
- side_effect = [NTP_TEMPLATE.lstrip()]
-
- with mock.patch.object(util, 'which', return_value=None):
- with mock.patch.object(os.path, 'exists'):
- with mock.patch.object(util, 'write_file') as mockwrite:
- with mock.patch.object(util, 'load_file',
- side_effect=side_effect):
- with mock.patch.object(os.path, 'isfile',
- return_value=True):
- with mock.patch.object(util, 'rename'):
- cc_ntp.handle("notimportant", cfg,
- mycloud, LOG, None)
-
- mockwrite.assert_called_once_with(
- '/etc/ntp.conf',
- NTP_EXPECTED_UBUNTU,
- mode=420)
-
- @mock.patch("cloudinit.config.cc_ntp.util")
- def test_no_ntpcfg_does_nothing(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- cc_ntp.handle('cc_ntp', {}, cc, LOG, [])
- self.assertFalse(cc.distro.install_packages.called)
- self.assertFalse(mock_util.subp.called)
+ ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ for distro in ('debian', 'ubuntu', 'fedora', 'rhel', 'sles'):
+ mycloud = self._get_cloud(distro)
+ root_dir = dirname(dirname(os.path.realpath(util.__file__)))
+ tmpl_file = os.path.join(
+ '{0}/templates/ntp.conf.{1}.tmpl'.format(root_dir, distro))
+ # Create a copy in our tmp_dir
+ shutil.copy(
+ tmpl_file,
+ os.path.join(self.new_root, 'ntp.conf.%s.tmpl' % distro))
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ with mock.patch.object(util, 'which', return_value=[True]):
+ cc_ntp.handle('notimportant', cfg, mycloud, None, None)
+
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ expected_servers = '\n'.join([
+ 'server {0} iburst'.format(server) for server in servers])
+ self.assertIn(
+ expected_servers, content.decode(),
+ 'failed to render ntp.conf for distro:{0}'.format(distro))
+ expected_pools = '\n'.join([
+ 'pool {0} iburst'.format(pool) for pool in pools])
+ self.assertIn(
+ expected_pools, content.decode(),
+ 'failed to render ntp.conf for distro:{0}'.format(distro))
+
+ def test_no_ntpcfg_does_nothing(self):
+ """When no ntp section is defined handler logs a warning and noops."""
+ cc_ntp.handle('cc_ntp', {}, None, None, [])
+ self.assertEqual(
+ 'Skipping module named cc_ntp, not present or disabled by cfg\n',
+ self.logs.getvalue())
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 3fd0069d..e382210d 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -15,12 +15,12 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_config(self):
# completely empty config should mean do nothing
(cmd, _timeout, _condition) = psc.load_power_state({})
- self.assertEqual(cmd, None)
+ self.assertIsNone(cmd)
def test_irrelevant_config(self):
# no power_state field in config should return None for cmd
(cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'})
- self.assertEqual(cmd, None)
+ self.assertIsNone(cmd)
def test_invalid_mode(self):
cfg = {'power_state': {'mode': 'gibberish'}}
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
new file mode 100644
index 00000000..52591b8b
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -0,0 +1,59 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_resizefs
+
+import textwrap
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class TestResizefs(unittest.TestCase):
+ def setUp(self):
+ super(TestResizefs, self).setUp()
+ self.name = "resizefs"
+
+ @mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output')
+ @mock.patch('cloudinit.config.cc_resizefs._get_gpart_output')
+ def test_skip_ufs_resize(self, gpart_out, dumpfs_out):
+ fs_type = "ufs"
+ resize_what = "/"
+ devpth = "/dev/da0p2"
+ dumpfs_out.return_value = (
+ "# newfs command for / (/dev/label/rootfs)\n"
+ "newfs -O 2 -U -a 4 -b 32768 -d 32768 -e 4096 "
+ "-f 4096 -g 16384 -h 64 -i 8192 -j -k 6408 -m 8 "
+ "-o time -s 58719232 /dev/label/rootfs\n")
+ gpart_out.return_value = textwrap.dedent("""\
+ => 40 62914480 da0 GPT (30G)
+ 40 1024 1 freebsd-boot (512K)
+ 1064 58719232 2 freebsd-ufs (28G)
+ 58720296 3145728 3 freebsd-swap (1.5G)
+ 61866024 1048496 - free - (512M)
+ """)
+ res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth)
+ self.assertTrue(res)
+
+ @mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output')
+ @mock.patch('cloudinit.config.cc_resizefs._get_gpart_output')
+ def test_skip_ufs_resize_roundup(self, gpart_out, dumpfs_out):
+ fs_type = "ufs"
+ resize_what = "/"
+ devpth = "/dev/da0p2"
+ dumpfs_out.return_value = (
+ "# newfs command for / (/dev/label/rootfs)\n"
+ "newfs -O 2 -U -a 4 -b 32768 -d 32768 -e 4096 "
+ "-f 4096 -g 16384 -h 64 -i 8192 -j -k 368 -m 8 "
+ "-o time -s 297080 /dev/label/rootfs\n")
+ gpart_out.return_value = textwrap.dedent("""\
+ => 34 297086 da0 GPT (145M)
+ 34 297086 1 freebsd-ufs (145M)
+ """)
+ res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth)
+ self.assertTrue(res)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index edb73d6d..e4d07622 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -419,7 +419,7 @@ class TestSnapConfig(FilesystemMockingTestCase):
def test_snap_config_add_snap_user_no_config(self):
usercfg = add_snap_user(cfg=None)
- self.assertEqual(usercfg, None)
+ self.assertIsNone(usercfg)
def test_snap_config_add_snap_user_not_dict(self):
cfg = ['foobar']
@@ -428,7 +428,7 @@ class TestSnapConfig(FilesystemMockingTestCase):
def test_snap_config_add_snap_user_no_email(self):
cfg = {'assertions': [], 'known': True}
usercfg = add_snap_user(cfg=cfg)
- self.assertEqual(usercfg, None)
+ self.assertIsNone(usercfg)
@mock.patch('cloudinit.config.cc_snap_config.util')
def test_snap_config_add_snap_user_email_only(self, mock_util):
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 4815bdb6..c4396df5 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -72,7 +72,7 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
for section in expected:
self.assertTrue(parser.has_section(section),
- "Contains section {}".format(section))
+ "Contains section {0}".format(section))
for k, v in expected[section].items():
self.assertEqual(parser.get(section, k), v)
@@ -109,7 +109,7 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
for section in expected:
self.assertTrue(parser.has_section(section),
- "Contains section {}".format(section))
+ "Contains section {0}".format(section))
for k, v in expected[section].items():
self.assertEqual(parser.get(section, k), v)
diff --git a/tests/unittests/test_helpers.py b/tests/unittests/test_helpers.py
index 955f8dfa..f1979e89 100644
--- a/tests/unittests/test_helpers.py
+++ b/tests/unittests/test_helpers.py
@@ -32,6 +32,6 @@ class TestPaths(test_helpers.ResourceUsingTestCase):
myds._instance_id = None
mypaths = self.getCloudPaths(myds)
- self.assertEqual(None, mypaths.get_ipath())
+ self.assertIsNone(mypaths.get_ipath())
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 89e75369..167ed01e 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -100,7 +100,8 @@ STATIC_EXPECTED_1 = {
'gateway': '10.0.0.1',
'dns_search': ['foo.com'], 'type': 'static',
'netmask': '255.255.255.0',
- 'dns_nameservers': ['10.0.1.1']}],
+ 'dns_nameservers': ['10.0.1.1'],
+ 'address': '10.0.0.2'}],
}
# Examples (and expected outputs for various renderers).
@@ -136,7 +137,7 @@ OS_SAMPLES = [
"""
# Created by cloud-init on instance boot automatically, do not edit.
#
-BOOTPROTO=static
+BOOTPROTO=none
DEFROUTE=yes
DEVICE=eth0
GATEWAY=172.19.3.254
@@ -204,38 +205,14 @@ nameserver 172.19.0.12
# Created by cloud-init on instance boot automatically, do not edit.
#
BOOTPROTO=none
-DEVICE=eth0
-HWADDR=fa:16:3e:ed:9a:59
-NM_CONTROLLED=no
-ONBOOT=yes
-TYPE=Ethernet
-USERCTL=no
-""".lstrip()),
- ('etc/sysconfig/network-scripts/ifcfg-eth0:0',
- """
-# Created by cloud-init on instance boot automatically, do not edit.
-#
-BOOTPROTO=static
DEFROUTE=yes
-DEVICE=eth0:0
+DEVICE=eth0
GATEWAY=172.19.3.254
HWADDR=fa:16:3e:ed:9a:59
IPADDR=172.19.1.34
+IPADDR1=10.0.0.10
NETMASK=255.255.252.0
-NM_CONTROLLED=no
-ONBOOT=yes
-TYPE=Ethernet
-USERCTL=no
-""".lstrip()),
- ('etc/sysconfig/network-scripts/ifcfg-eth0:1',
- """
-# Created by cloud-init on instance boot automatically, do not edit.
-#
-BOOTPROTO=static
-DEVICE=eth0:1
-HWADDR=fa:16:3e:ed:9a:59
-IPADDR=10.0.0.10
-NETMASK=255.255.255.0
+NETMASK1=255.255.255.0
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
@@ -265,7 +242,7 @@ nameserver 172.19.0.12
}],
"ip_address": "172.19.1.34", "id": "network0"
}, {
- "network_id": "public-ipv6",
+ "network_id": "public-ipv6-a",
"type": "ipv6", "netmask": "",
"link": "tap1a81968a-79",
"routes": [
@@ -276,6 +253,20 @@ nameserver 172.19.0.12
}
],
"ip_address": "2001:DB8::10", "id": "network1"
+ }, {
+ "network_id": "public-ipv6-b",
+ "type": "ipv6", "netmask": "64",
+ "link": "tap1a81968a-79",
+ "routes": [
+ ],
+ "ip_address": "2001:DB9::10", "id": "network2"
+ }, {
+ "network_id": "public-ipv6-c",
+ "type": "ipv6", "netmask": "64",
+ "link": "tap1a81968a-79",
+ "routes": [
+ ],
+ "ip_address": "2001:DB10::10", "id": "network3"
}],
"links": [
{
@@ -295,41 +286,16 @@ nameserver 172.19.0.12
# Created by cloud-init on instance boot automatically, do not edit.
#
BOOTPROTO=none
-DEVICE=eth0
-HWADDR=fa:16:3e:ed:9a:59
-NM_CONTROLLED=no
-ONBOOT=yes
-TYPE=Ethernet
-USERCTL=no
-""".lstrip()),
- ('etc/sysconfig/network-scripts/ifcfg-eth0:0',
- """
-# Created by cloud-init on instance boot automatically, do not edit.
-#
-BOOTPROTO=static
DEFROUTE=yes
-DEVICE=eth0:0
+DEVICE=eth0
GATEWAY=172.19.3.254
HWADDR=fa:16:3e:ed:9a:59
IPADDR=172.19.1.34
-NETMASK=255.255.252.0
-NM_CONTROLLED=no
-ONBOOT=yes
-TYPE=Ethernet
-USERCTL=no
-""".lstrip()),
- ('etc/sysconfig/network-scripts/ifcfg-eth0:1',
- """
-# Created by cloud-init on instance boot automatically, do not edit.
-#
-BOOTPROTO=static
-DEFROUTE=yes
-DEVICE=eth0:1
-HWADDR=fa:16:3e:ed:9a:59
IPV6ADDR=2001:DB8::10
+IPV6ADDR_SECONDARIES="2001:DB9::10/64 2001:DB10::10/64"
IPV6INIT=yes
IPV6_DEFAULTGW=2001:DB8::1
-NETMASK=
+NETMASK=255.255.252.0
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
@@ -440,7 +406,7 @@ NETWORK_CONFIGS = {
- sach.maas
- wark.maas
routes:
- - to: 0.0.0.0/0.0.0.0
+ - to: 0.0.0.0/0
via: 65.61.151.37
set-name: eth99
""").rstrip(' '),
@@ -517,11 +483,15 @@ auto eth1
iface eth1 inet manual
bond-master bond0
bond-mode active-backup
+ bond-xmit-hash-policy layer3+4
+ bond_miimon 100
auto eth2
iface eth2 inet manual
bond-master bond0
bond-mode active-backup
+ bond-xmit-hash-policy layer3+4
+ bond_miimon 100
iface eth3 inet manual
@@ -534,6 +504,8 @@ auto bond0
iface bond0 inet6 dhcp
bond-mode active-backup
bond-slaves none
+ bond-xmit-hash-policy layer3+4
+ bond_miimon 100
hwaddress aa:bb:cc:dd:ee:ff
auto br0
@@ -557,6 +529,7 @@ iface eth0.101 inet static
dns-nameservers 192.168.0.10 10.23.23.134
dns-search barley.maas sacchromyces.maas brettanomyces.maas
gateway 192.168.0.1
+ hwaddress aa:bb:cc:dd:ee:11
mtu 1500
vlan-raw-device eth0
vlan_id 101
@@ -658,7 +631,9 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
- eth1
- eth2
parameters:
+ mii-monitor-interval: 100
mode: active-backup
+ transmit-hash-policy: layer3+4
bridges:
br0:
addresses:
@@ -679,6 +654,7 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
gateway4: 192.168.0.1
id: 101
link: eth0
+ macaddress: aa:bb:cc:dd:ee:11
nameservers:
addresses:
- 192.168.0.10
@@ -722,6 +698,7 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
name: eth0.101
vlan_link: eth0
vlan_id: 101
+ mac_address: aa:bb:cc:dd:ee:11
mtu: 1500
subnets:
- type: static
@@ -747,6 +724,8 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
- eth2
params:
bond-mode: active-backup
+ bond_miimon: 100
+ bond-xmit-hash-policy: "layer3+4"
subnets:
- type: dhcp6
# A Bond VLAN.
@@ -880,6 +859,82 @@ USERCTL=no
""".lstrip()
self.assertEqual(expected_content, content)
+ def test_multiple_ipv4_default_gateways(self):
+ """ValueError is raised when duplicate ipv4 gateways exist."""
+ net_json = {
+ "services": [{"type": "dns", "address": "172.19.0.12"}],
+ "networks": [{
+ "network_id": "dacd568d-5be6-4786-91fe-750c374b78b4",
+ "type": "ipv4", "netmask": "255.255.252.0",
+ "link": "tap1a81968a-79",
+ "routes": [{
+ "netmask": "0.0.0.0",
+ "network": "0.0.0.0",
+ "gateway": "172.19.3.254",
+ }, {
+ "netmask": "0.0.0.0", # A second default gateway
+ "network": "0.0.0.0",
+ "gateway": "172.20.3.254",
+ }],
+ "ip_address": "172.19.1.34", "id": "network0"
+ }],
+ "links": [
+ {
+ "ethernet_mac_address": "fa:16:3e:ed:9a:59",
+ "mtu": None, "type": "bridge", "id":
+ "tap1a81968a-79",
+ "vif_id": "1a81968a-797a-400f-8a80-567f997eb93f"
+ },
+ ],
+ }
+ macs = {'fa:16:3e:ed:9a:59': 'eth0'}
+ render_dir = self.tmp_dir()
+ network_cfg = openstack.convert_net_json(net_json, known_macs=macs)
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+ renderer = sysconfig.Renderer()
+ with self.assertRaises(ValueError):
+ renderer.render_network_state(ns, render_dir)
+ self.assertEqual([], os.listdir(render_dir))
+
+ def test_multiple_ipv6_default_gateways(self):
+ """ValueError is raised when duplicate ipv6 gateways exist."""
+ net_json = {
+ "services": [{"type": "dns", "address": "172.19.0.12"}],
+ "networks": [{
+ "network_id": "public-ipv6",
+ "type": "ipv6", "netmask": "",
+ "link": "tap1a81968a-79",
+ "routes": [{
+ "gateway": "2001:DB8::1",
+ "netmask": "::",
+ "network": "::"
+ }, {
+ "gateway": "2001:DB9::1",
+ "netmask": "::",
+ "network": "::"
+ }],
+ "ip_address": "2001:DB8::10", "id": "network1"
+ }],
+ "links": [
+ {
+ "ethernet_mac_address": "fa:16:3e:ed:9a:59",
+ "mtu": None, "type": "bridge", "id":
+ "tap1a81968a-79",
+ "vif_id": "1a81968a-797a-400f-8a80-567f997eb93f"
+ },
+ ],
+ }
+ macs = {'fa:16:3e:ed:9a:59': 'eth0'}
+ render_dir = self.tmp_dir()
+ network_cfg = openstack.convert_net_json(net_json, known_macs=macs)
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+ renderer = sysconfig.Renderer()
+ with self.assertRaises(ValueError):
+ renderer.render_network_state(ns, render_dir)
+ self.assertEqual([], os.listdir(render_dir))
+
def test_openstack_rendering_samples(self):
for os_sample in OS_SAMPLES:
render_dir = self.tmp_dir()
@@ -996,7 +1051,7 @@ class TestNetplanNetRendering(CiTestCase):
render_target = 'netplan.yaml'
renderer = netplan.Renderer(
{'netplan_path': render_target, 'postcmds': False})
- renderer.render_network_state(render_dir, ns)
+ renderer.render_network_state(ns, render_dir)
self.assertTrue(os.path.exists(os.path.join(render_dir,
render_target)))
@@ -1101,7 +1156,7 @@ class TestNetplanPostcommands(CiTestCase):
render_target = 'netplan.yaml'
renderer = netplan.Renderer(
{'netplan_path': render_target, 'postcmds': True})
- renderer.render_network_state(render_dir, ns)
+ renderer.render_network_state(ns, render_dir)
mock_netplan_generate.assert_called_with(run=True)
mock_net_setup_link.assert_called_with(run=True)
@@ -1120,14 +1175,14 @@ class TestNetplanPostcommands(CiTestCase):
render_target = 'netplan.yaml'
renderer = netplan.Renderer(
{'netplan_path': render_target, 'postcmds': True})
- renderer.render_network_state(render_dir, ns)
-
expected = [
mock.call(['netplan', 'generate'], capture=True),
mock.call(['udevadm', 'test-builtin', 'net_setup_link',
'/sys/class/net/lo'], capture=True),
]
- mock_subp.assert_has_calls(expected)
+ with mock.patch.object(os.path, 'islink', return_value=True):
+ renderer.render_network_state(ns, render_dir)
+ mock_subp.assert_has_calls(expected)
class TestEniNetworkStateToEni(CiTestCase):
@@ -1256,7 +1311,7 @@ class TestCmdlineReadKernelConfig(CiTestCase):
files = sorted(populate_dir(self.tmp_dir(), content))
found = cmdline.read_kernel_cmdline_config(
files=files, cmdline='foo root=/dev/sda', mac_addrs=self.macs)
- self.assertEqual(found, None)
+ self.assertIsNone(found)
def test_ip_cmdline_both_ip_ip6(self):
content = {'net-eth0.conf': DHCP_CONTENT_1,
@@ -1277,9 +1332,9 @@ class TestCmdlineReadKernelConfig(CiTestCase):
class TestNetplanRoundTrip(CiTestCase):
def _render_and_read(self, network_config=None, state=None,
- netplan_path=None, dir=None):
- if dir is None:
- dir = self.tmp_dir()
+ netplan_path=None, target=None):
+ if target is None:
+ target = self.tmp_dir()
if network_config:
ns = network_state.parse_net_config_data(network_config)
@@ -1294,8 +1349,8 @@ class TestNetplanRoundTrip(CiTestCase):
renderer = netplan.Renderer(
config={'netplan_path': netplan_path})
- renderer.render_network_state(dir, ns)
- return dir2dict(dir)
+ renderer.render_network_state(ns, target)
+ return dir2dict(target)
def testsimple_render_small_netplan(self):
entry = NETWORK_CONFIGS['small']
@@ -1462,24 +1517,24 @@ class TestNetRenderers(CiTestCase):
class TestGetInterfacesByMac(CiTestCase):
- _data = {'devices': ['enp0s1', 'enp0s2', 'bond1', 'bridge1',
- 'bridge1-nic', 'tun0', 'bond1.101'],
- 'bonds': ['bond1'],
+ _data = {'bonds': ['bond1'],
'bridges': ['bridge1'],
'vlans': ['bond1.101'],
'own_macs': ['enp0s1', 'enp0s2', 'bridge1-nic', 'bridge1',
- 'bond1.101'],
+ 'bond1.101', 'lo'],
'macs': {'enp0s1': 'aa:aa:aa:aa:aa:01',
'enp0s2': 'aa:aa:aa:aa:aa:02',
'bond1': 'aa:aa:aa:aa:aa:01',
'bond1.101': 'aa:aa:aa:aa:aa:01',
'bridge1': 'aa:aa:aa:aa:aa:03',
'bridge1-nic': 'aa:aa:aa:aa:aa:03',
+ 'lo': '00:00:00:00:00:00',
+ 'greptap0': '00:00:00:00:00:00',
'tun0': None}}
data = {}
def _se_get_devicelist(self):
- return self.data['devices']
+ return list(self.data['devices'])
def _se_get_interface_mac(self, name):
return self.data['macs'][name]
@@ -1495,6 +1550,7 @@ class TestGetInterfacesByMac(CiTestCase):
def _mock_setup(self):
self.data = copy.deepcopy(self._data)
+ self.data['devices'] = set(list(self.data['macs'].keys()))
mocks = ('get_devicelist', 'get_interface_mac', 'is_bridge',
'interface_has_own_mac', 'is_vlan')
self.mocks = {}
@@ -1522,7 +1578,7 @@ class TestGetInterfacesByMac(CiTestCase):
[mock.call('enp0s1'), mock.call('bond1')], any_order=True)
self.assertEqual(
{'aa:aa:aa:aa:aa:01': 'enp0s1', 'aa:aa:aa:aa:aa:02': 'enp0s2',
- 'aa:aa:aa:aa:aa:03': 'bridge1-nic'},
+ 'aa:aa:aa:aa:aa:03': 'bridge1-nic', '00:00:00:00:00:00': 'lo'},
ret)
def test_excludes_bridges(self):
@@ -1531,7 +1587,7 @@ class TestGetInterfacesByMac(CiTestCase):
# set everything other than 'b1' to be a bridge.
# then expect b1 is the only thing left.
self.data['macs']['b1'] = 'aa:aa:aa:aa:aa:b1'
- self.data['devices'].append('b1')
+ self.data['devices'].add('b1')
self.data['bonds'] = []
self.data['own_macs'] = self.data['devices']
self.data['bridges'] = [f for f in self.data['devices'] if f != "b1"]
@@ -1548,7 +1604,7 @@ class TestGetInterfacesByMac(CiTestCase):
# set everything other than 'b1' to be a vlan.
# then expect b1 is the only thing left.
self.data['macs']['b1'] = 'aa:aa:aa:aa:aa:b1'
- self.data['devices'].append('b1')
+ self.data['devices'].add('b1')
self.data['bonds'] = []
self.data['bridges'] = []
self.data['own_macs'] = self.data['devices']
@@ -1560,6 +1616,16 @@ class TestGetInterfacesByMac(CiTestCase):
mock.call('b1')],
any_order=True)
+ def test_duplicates_of_empty_mac_are_ok(self):
+ """Duplicate macs of 00:00:00:00:00:00 should be skipped."""
+ self._mock_setup()
+ empty_mac = "00:00:00:00:00:00"
+ addnics = ('greptap1', 'lo', 'greptap2')
+ self.data['macs'].update(dict((k, empty_mac) for k in addnics))
+ self.data['devices'].update(set(addnics))
+ ret = net.get_interfaces_by_mac()
+ self.assertEqual('lo', ret[empty_mac])
+
def _gzip_data(data):
with io.BytesIO() as iobuf:
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 5d21b4b7..014aa6a3 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -44,7 +44,7 @@ class TestGetCfgOptionListOrStr(helpers.TestCase):
"""None is returned if key is not found and no default given."""
config = {}
result = util.get_cfg_option_list(config, "key")
- self.assertEqual(None, result)
+ self.assertIsNone(result)
def test_not_found_with_default(self):
"""Default is returned if key is not found."""
@@ -432,13 +432,13 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
def test_none_returned_if_neither_source_has_data(self):
self.patch_mapping({})
self._configure_dmidecode_return('key', 'value')
- self.assertEqual(None, util.read_dmi_data('expect-fail'))
+ self.assertIsNone(util.read_dmi_data('expect-fail'))
def test_none_returned_if_dmidecode_not_in_path(self):
self.patched_funcs.enter_context(
mock.patch.object(util, 'which', lambda _: False))
self.patch_mapping({})
- self.assertEqual(None, util.read_dmi_data('expect-fail'))
+ self.assertIsNone(util.read_dmi_data('expect-fail'))
def test_dots_returned_instead_of_foxfox(self):
# uninitialized dmi values show as \xff, return those as .
@@ -596,7 +596,8 @@ class TestSubp(helpers.TestCase):
def test_subp_capture_stderr(self):
data = b'hello world'
(out, err) = util.subp(self.stdin2err, capture=True,
- decode=False, data=data)
+ decode=False, data=data,
+ update_env={'LC_ALL': 'C'})
self.assertEqual(err, data)
self.assertEqual(out, b'')
@@ -625,8 +626,8 @@ class TestSubp(helpers.TestCase):
def test_returns_none_if_no_capture(self):
(out, err) = util.subp(self.stdin2out, data=b'', capture=False)
- self.assertEqual(err, None)
- self.assertEqual(out, None)
+ self.assertIsNone(err)
+ self.assertIsNone(out)
def test_bunch_of_slashes_in_path(self):
self.assertEqual("/target/my/path/",
@@ -711,4 +712,73 @@ class TestProcessExecutionError(helpers.TestCase):
)).format(description=self.empty_description,
empty_attr=self.empty_attr))
+
+class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
+ def test_id_in_os_release_quoted(self):
+ """os-release containing ID="ubuntu-core" is snappy."""
+ orcontent = '\n'.join(['ID="ubuntu-core"', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ def test_id_in_os_release(self):
+ """os-release containing ID=ubuntu-core is snappy."""
+ orcontent = '\n'.join(['ID=ubuntu-core', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_bad_content_in_os_release_no_effect(self, m_cmdline):
+ """malformed os-release should not raise exception."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ orcontent = '\n'.join(['IDubuntu-core', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot()
+ self.assertFalse(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_snap_core_in_cmdline_is_snappy(self, m_cmdline):
+ """The string snap_core= in kernel cmdline indicates snappy."""
+ cmdline = (
+ "BOOT_IMAGE=(loop)/kernel.img root=LABEL=writable "
+ "snap_core=core_x1.snap snap_kernel=pc-kernel_x1.snap ro "
+ "net.ifnames=0 init=/lib/systemd/systemd console=tty1 "
+ "console=ttyS0 panic=-1")
+ m_cmdline.return_value = cmdline
+ self.assertTrue(util.system_is_snappy())
+ self.assertTrue(m_cmdline.call_count > 0)
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_nothing_found_is_not_snappy(self, m_cmdline):
+ """If no positive identification, then not snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ self.reRoot()
+ self.assertFalse(util.system_is_snappy())
+ self.assertTrue(m_cmdline.call_count > 0)
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_channel_ini_with_snappy_is_snappy(self, m_cmdline):
+ """A Channel.ini file with 'ubuntu-core' indicates snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ root_d = self.tmp_dir()
+ content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""])
+ helpers.populate_dir(
+ root_d, {'etc/system-image/channel.ini': content})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_system_image_config_dir_is_snappy(self, m_cmdline):
+ """Existence of /etc/system-image/config.d indicates snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ root_d = self.tmp_dir()
+ helpers.populate_dir(
+ root_d, {'etc/system-image/config.d/my.file': "_unused"})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
# vi: ts=4 expandtab