diff options
| author | Brett Holman <bholman.devel@gmail.com> | 2021-12-03 13:11:46 -0700 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-12-03 13:11:46 -0700 | 
| commit | 039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch) | |
| tree | 5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /tests/unittests/sources/test_altcloud.py | |
| parent | ffa6fc88249aa080aa31811a45569a45e567418a (diff) | |
| download | vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip | |
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/
such that any source file located at cloudinit/path/to/file.py may have a
corresponding unit test file at test/unittests/path/to/test_file.py.
Noteworthy Comments:
====================
Four different duplicate test files existed:
test_{gpg,util,cc_mounts,cc_resolv_conf}.py
Each of these duplicate file pairs has been merged together. This is a
break in git history for these files.
The test suite appears to have a dependency on test order. Changing test
order causes some tests to fail. This should be rectified, but for now
some tests have been modified in
tests/unittests/config/test_set_passwords.py.
A helper class name starts with "Test" which causes pytest to try
executing it as a test case, which then throws warnings "due to Class
having __init__()".  Silence by changing the name of the class.
# helpers.py is imported in many test files, import paths change
cloudinit/tests/helpers.py -> tests/unittests/helpers.py
# Move directories:
cloudinit/distros/tests -> tests/unittests/distros
cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel
cloudinit/cmd/tests -> tests/unittests/cmd/
cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers
cloudinit/sources/tests -> tests/unittests/sources
cloudinit/net/tests -> tests/unittests/net
cloudinit/config/tests -> tests/unittests/config
cloudinit/analyze/tests/ -> tests/unittests/analyze/
# Standardize tests already in tests/unittests/
test_datasource -> sources
test_distros -> distros
test_vmware -> sources/vmware
test_handler -> config        # this contains cloudconfig module tests
test_runs -> runs
Diffstat (limited to 'tests/unittests/sources/test_altcloud.py')
| -rw-r--r-- | tests/unittests/sources/test_altcloud.py | 450 | 
1 files changed, 450 insertions, 0 deletions
| diff --git a/tests/unittests/sources/test_altcloud.py b/tests/unittests/sources/test_altcloud.py new file mode 100644 index 00000000..7384c104 --- /dev/null +++ b/tests/unittests/sources/test_altcloud.py @@ -0,0 +1,450 @@ +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' + +import os +import shutil +import tempfile + +from cloudinit import dmi +from cloudinit import helpers +from cloudinit import subp +from cloudinit import util + +from tests.unittests.helpers import CiTestCase, mock + +import cloudinit.sources.DataSourceAltCloud as dsac + +OS_UNAME_ORIG = getattr(os, 'uname') + + +def _write_user_data_files(mount_dir, value): +    ''' +    Populate the deltacloud_user_data_file the user_data_file +    which would be populated with user data. +    ''' +    deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' +    user_data_file = mount_dir + '/user-data.txt' + +    udfile = open(deltacloud_user_data_file, 'w') +    udfile.write(value) +    udfile.close() +    os.chmod(deltacloud_user_data_file, 0o664) + +    udfile = open(user_data_file, 'w') +    udfile.write(value) +    udfile.close() +    os.chmod(user_data_file, 0o664) + + +def _remove_user_data_files(mount_dir, +                            dc_file=True, +                            non_dc_file=True): +    ''' +    Remove the test files: deltacloud_user_data_file and +    user_data_file +    ''' +    deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' +    user_data_file = mount_dir + '/user-data.txt' + +    # Ignore any failures removeing files that are already gone. +    if dc_file: +        try: +            os.remove(deltacloud_user_data_file) +        except OSError: +            pass + +    if non_dc_file: +        try: +            os.remove(user_data_file) +        except OSError: +            pass + + +def _dmi_data(expected): +    ''' +    Spoof the data received over DMI +    ''' +    def _data(key): +        return expected + +    return _data + + +class TestGetCloudType(CiTestCase): +    '''Test to exercise method: DataSourceAltCloud.get_cloud_type()''' + +    with_logs = True + +    def setUp(self): +        '''Set up.''' +        super(TestGetCloudType, self).setUp() +        self.tmp = self.tmp_dir() +        self.paths = helpers.Paths({'cloud_dir': self.tmp}) +        self.dmi_data = dmi.read_dmi_data +        # We have a different code path for arm to deal with LP1243287 +        # We have to switch arch to x86_64 to avoid test failure +        force_arch('x86_64') + +    def tearDown(self): +        # Reset +        dmi.read_dmi_data = self.dmi_data +        force_arch() + +    def test_cloud_info_file_ioerror(self): +        """Return UNKNOWN when /etc/sysconfig/cloud-info exists but errors.""" +        self.assertEqual('/etc/sysconfig/cloud-info', dsac.CLOUD_INFO_FILE) +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        # Attempting to read the directory generates IOError +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.tmp): +            self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) +        self.assertIn( +            "[Errno 21] Is a directory: '%s'" % self.tmp, +            self.logs.getvalue()) + +    def test_cloud_info_file(self): +        """Return uppercase stripped content from /etc/sysconfig/cloud-info.""" +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        cloud_info = self.tmp_path('cloud-info', dir=self.tmp) +        util.write_file(cloud_info, ' OverRiDdeN CloudType ') +        # Attempting to read the directory generates IOError +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', cloud_info): +            self.assertEqual('OVERRIDDEN CLOUDTYPE', dsrc.get_cloud_type()) + +    def test_rhev(self): +        ''' +        Test method get_cloud_type() for RHEVm systems. +        Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor +        ''' +        dmi.read_dmi_data = _dmi_data('RHEV') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual('RHEV', dsrc.get_cloud_type()) + +    def test_vsphere(self): +        ''' +        Test method get_cloud_type() for vSphere systems. +        Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor +        ''' +        dmi.read_dmi_data = _dmi_data('VMware Virtual Platform') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual('VSPHERE', dsrc.get_cloud_type()) + +    def test_unknown(self): +        ''' +        Test method get_cloud_type() for unknown systems. +        Forcing read_dmi_data return to match an unrecognized return. +        ''' +        dmi.read_dmi_data = _dmi_data('Unrecognized Platform') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) + + +class TestGetDataCloudInfoFile(CiTestCase): +    ''' +    Test to exercise method: DataSourceAltCloud.get_data() +    With a contrived CLOUD_INFO_FILE +    ''' +    def setUp(self): +        '''Set up.''' +        self.tmp = self.tmp_dir() +        self.paths = helpers.Paths( +            {'cloud_dir': self.tmp, 'run_dir': self.tmp}) +        self.cloud_info_file = self.tmp_path('cloud-info', dir=self.tmp) + +    def test_rhev(self): +        '''Success Test module get_data() forcing RHEV.''' + +        util.write_file(self.cloud_info_file, 'RHEV') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_rhevm = lambda: True +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): +            self.assertEqual(True, dsrc.get_data()) +        self.assertEqual('altcloud', dsrc.cloud_name) +        self.assertEqual('altcloud', dsrc.platform_type) +        self.assertEqual('rhev (/dev/fd0)', dsrc.subplatform) + +    def test_vsphere(self): +        '''Success Test module get_data() forcing VSPHERE.''' + +        util.write_file(self.cloud_info_file, 'VSPHERE') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_vsphere = lambda: True +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): +            self.assertEqual(True, dsrc.get_data()) +        self.assertEqual('altcloud', dsrc.cloud_name) +        self.assertEqual('altcloud', dsrc.platform_type) +        self.assertEqual('vsphere (unknown)', dsrc.subplatform) + +    def test_fail_rhev(self): +        '''Failure Test module get_data() forcing RHEV.''' + +        util.write_file(self.cloud_info_file, 'RHEV') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_rhevm = lambda: False +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): +            self.assertEqual(False, dsrc.get_data()) + +    def test_fail_vsphere(self): +        '''Failure Test module get_data() forcing VSPHERE.''' + +        util.write_file(self.cloud_info_file, 'VSPHERE') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_vsphere = lambda: False +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): +            self.assertEqual(False, dsrc.get_data()) + +    def test_unrecognized(self): +        '''Failure Test module get_data() forcing unrecognized.''' + +        util.write_file(self.cloud_info_file, 'unrecognized') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file): +            self.assertEqual(False, dsrc.get_data()) + + +class TestGetDataNoCloudInfoFile(CiTestCase): +    ''' +    Test to exercise method: DataSourceAltCloud.get_data() +    Without a CLOUD_INFO_FILE +    ''' +    def setUp(self): +        '''Set up.''' +        self.tmp = self.tmp_dir() +        self.paths = helpers.Paths( +            {'cloud_dir': self.tmp, 'run_dir': self.tmp}) +        self.dmi_data = dmi.read_dmi_data +        dsac.CLOUD_INFO_FILE = \ +            'no such file' +        # We have a different code path for arm to deal with LP1243287 +        # We have to switch arch to x86_64 to avoid test failure +        force_arch('x86_64') + +    def tearDown(self): +        # Reset +        dsac.CLOUD_INFO_FILE = \ +            '/etc/sysconfig/cloud-info' +        dmi.read_dmi_data = self.dmi_data +        # Return back to original arch +        force_arch() + +    def test_rhev_no_cloud_file(self): +        '''Test No cloud info file module get_data() forcing RHEV.''' + +        dmi.read_dmi_data = _dmi_data('RHEV Hypervisor') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_rhevm = lambda: True +        self.assertEqual(True, dsrc.get_data()) + +    def test_vsphere_no_cloud_file(self): +        '''Test No cloud info file module get_data() forcing VSPHERE.''' + +        dmi.read_dmi_data = _dmi_data('VMware Virtual Platform') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        dsrc.user_data_vsphere = lambda: True +        self.assertEqual(True, dsrc.get_data()) + +    def test_failure_no_cloud_file(self): +        '''Test No cloud info file module get_data() forcing unrecognized.''' + +        dmi.read_dmi_data = _dmi_data('Unrecognized Platform') +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.get_data()) + + +class TestUserDataRhevm(CiTestCase): +    ''' +    Test to exercise method: DataSourceAltCloud.user_data_rhevm() +    ''' +    def setUp(self): +        '''Set up.''' +        self.paths = helpers.Paths({'cloud_dir': '/tmp'}) +        self.mount_dir = self.tmp_dir() +        _write_user_data_files(self.mount_dir, 'test user data') +        self.add_patch( +            'cloudinit.sources.DataSourceAltCloud.modprobe_floppy', +            'm_modprobe_floppy', return_value=None) +        self.add_patch( +            'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle', +            'm_udevadm_settle', return_value=('', '')) +        self.add_patch( +            'cloudinit.sources.DataSourceAltCloud.util.mount_cb', +            'm_mount_cb') + +    def test_mount_cb_fails(self): +        '''Test user_data_rhevm() where mount_cb fails.''' + +        self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_rhevm()) + +    def test_modprobe_fails(self): +        '''Test user_data_rhevm() where modprobe fails.''' + +        self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError( +            "Failed modprobe") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_rhevm()) + +    def test_no_modprobe_cmd(self): +        '''Test user_data_rhevm() with no modprobe command.''' + +        self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError( +            "No such file or dir") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_rhevm()) + +    def test_udevadm_fails(self): +        '''Test user_data_rhevm() where udevadm fails.''' + +        self.m_udevadm_settle.side_effect = subp.ProcessExecutionError( +            "Failed settle.") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_rhevm()) + +    def test_no_udevadm_cmd(self): +        '''Test user_data_rhevm() with no udevadm command.''' + +        self.m_udevadm_settle.side_effect = OSError("No such file or dir") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_rhevm()) + + +class TestUserDataVsphere(CiTestCase): +    ''' +    Test to exercise method: DataSourceAltCloud.user_data_vsphere() +    ''' +    def setUp(self): +        '''Set up.''' +        self.tmp = self.tmp_dir() +        self.paths = helpers.Paths({'cloud_dir': self.tmp}) +        self.mount_dir = tempfile.mkdtemp() + +        _write_user_data_files(self.mount_dir, 'test user data') + +    def tearDown(self): +        # Reset + +        _remove_user_data_files(self.mount_dir) + +        # Attempt to remove the temp dir ignoring errors +        try: +            shutil.rmtree(self.mount_dir) +        except OSError: +            pass + +        dsac.CLOUD_INFO_FILE = \ +            '/etc/sysconfig/cloud-info' + +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") +    def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with): +        '''Test user_data_vsphere() where mount_cb fails.''' + +        m_mount_cb.return_value = [] +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_vsphere()) +        self.assertEqual(0, m_mount_cb.call_count) + +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") +    def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with): +        '''Test user_data_vsphere() where mount_cb fails.''' + +        m_find_devs_with.return_value = ["/dev/mock/cdrom"] +        m_mount_cb.side_effect = util.MountFailedError("Unable To mount") +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        self.assertEqual(False, dsrc.user_data_vsphere()) +        self.assertEqual(1, m_find_devs_with.call_count) +        self.assertEqual(1, m_mount_cb.call_count) + +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") +    @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") +    def test_user_data_vsphere_success(self, m_mount_cb, m_find_devs_with): +        """Test user_data_vsphere() where successful.""" +        m_find_devs_with.return_value = ["/dev/mock/cdrom"] +        m_mount_cb.return_value = 'raw userdata from cdrom' +        dsrc = dsac.DataSourceAltCloud({}, None, self.paths) +        cloud_info = self.tmp_path('cloud-info', dir=self.tmp) +        util.write_file(cloud_info, 'VSPHERE') +        self.assertEqual(True, dsrc.user_data_vsphere()) +        m_find_devs_with.assert_called_once_with('LABEL=CDROM') +        m_mount_cb.assert_called_once_with( +            '/dev/mock/cdrom', dsac.read_user_data_callback) +        with mock.patch.object(dsrc, 'get_cloud_type', return_value='VSPHERE'): +            self.assertEqual('vsphere (/dev/mock/cdrom)', dsrc.subplatform) + + +class TestReadUserDataCallback(CiTestCase): +    ''' +    Test to exercise method: DataSourceAltCloud.read_user_data_callback() +    ''' +    def setUp(self): +        '''Set up.''' +        self.paths = helpers.Paths({'cloud_dir': '/tmp'}) +        self.mount_dir = tempfile.mkdtemp() + +        _write_user_data_files(self.mount_dir, 'test user data') + +    def tearDown(self): +        # Reset + +        _remove_user_data_files(self.mount_dir) + +        # Attempt to remove the temp dir ignoring errors +        try: +            shutil.rmtree(self.mount_dir) +        except OSError: +            pass + +    def test_callback_both(self): +        '''Test read_user_data_callback() with both files.''' + +        self.assertEqual('test user data', +                         dsac.read_user_data_callback(self.mount_dir)) + +    def test_callback_dc(self): +        '''Test read_user_data_callback() with only DC file.''' + +        _remove_user_data_files(self.mount_dir, +                                dc_file=False, +                                non_dc_file=True) + +        self.assertEqual('test user data', +                         dsac.read_user_data_callback(self.mount_dir)) + +    def test_callback_non_dc(self): +        '''Test read_user_data_callback() with only non-DC file.''' + +        _remove_user_data_files(self.mount_dir, +                                dc_file=True, +                                non_dc_file=False) + +        self.assertEqual('test user data', +                         dsac.read_user_data_callback(self.mount_dir)) + +    def test_callback_none(self): +        '''Test read_user_data_callback() no files are found.''' + +        _remove_user_data_files(self.mount_dir) +        self.assertIsNone(dsac.read_user_data_callback(self.mount_dir)) + + +def force_arch(arch=None): + +    def _os_uname(): +        return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', arch) + +    if arch: +        setattr(os, 'uname', _os_uname) +    elif arch is None: +        setattr(os, 'uname', OS_UNAME_ORIG) + +# vi: ts=4 expandtab | 
