diff options
| author | Chad Smith <chad.smith@canonical.com> | 2017-12-05 16:25:11 -0700 | 
|---|---|---|
| committer | Chad Smith <chad.smith@canonical.com> | 2017-12-05 16:25:11 -0700 | 
| commit | 30b4d15764a1a9644379cf95770e8b2480856882 (patch) | |
| tree | 102b18e80e5ff8bf383a7fe35e56f88328cd924a /cloudinit/cmd/tests/test_status.py | |
| parent | 47016791ca5e97d80e45d3f100bc4e5d0b88627d (diff) | |
| download | vyos-cloud-init-30b4d15764a1a9644379cf95770e8b2480856882.tar.gz vyos-cloud-init-30b4d15764a1a9644379cf95770e8b2480856882.zip  | |
cli: Add clean and status subcommands
The 'cloud-init clean' command allows a user or script to clear cloud-init
artifacts from the system so that cloud-init sees the system as
unconfigured upon reboot. Optional parameters can be provided to remove
cloud-init logs and reboot after clean.
The 'cloud-init status' command allows the user or script to check whether
cloud-init has finished all configuration stages and whether errors
occurred. An optional --wait argument will poll on a 0.25 second interval
until cloud-init configuration is complete. The benefit here is scripts
can block on cloud-init completion before performing post-config tasks.
Diffstat (limited to 'cloudinit/cmd/tests/test_status.py')
| -rw-r--r-- | cloudinit/cmd/tests/test_status.py | 353 | 
1 files changed, 353 insertions, 0 deletions
diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py new file mode 100644 index 00000000..8ec9b5bc --- /dev/null +++ b/cloudinit/cmd/tests/test_status.py @@ -0,0 +1,353 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from collections import namedtuple +import os +from six import StringIO +from textwrap import dedent + +from cloudinit.atomic_helper import write_json +from cloudinit.cmd import status +from cloudinit.util import write_file +from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock + +mypaths = namedtuple('MyPaths', 'run_dir') +myargs = namedtuple('MyArgs', 'long wait') + + +class TestStatus(CiTestCase): + +    def setUp(self): +        super(TestStatus, self).setUp() +        self.new_root = self.tmp_dir() +        self.status_file = self.tmp_path('status.json', self.new_root) +        self.disable_file = self.tmp_path('cloudinit-disable', self.new_root) +        self.paths = mypaths(run_dir=self.new_root) + +        class FakeInit(object): +            paths = self.paths + +            def __init__(self, ds_deps): +                pass + +            def read_cfg(self): +                pass + +        self.init_class = FakeInit + +    def test__is_cloudinit_disabled_false_on_sysvinit(self): +        '''When not in an environment using systemd, return False.''' +        write_file(self.disable_file, '')  # Create the ignored disable file +        (is_disabled, reason) = wrap_and_call( +            'cloudinit.cmd.status', +            {'uses_systemd': False}, +            status._is_cloudinit_disabled, self.disable_file, self.paths) +        self.assertFalse( +            is_disabled, 'expected enabled cloud-init on sysvinit') +        self.assertEqual('Cloud-init enabled on sysvinit', reason) + +    def test__is_cloudinit_disabled_true_on_disable_file(self): +        '''When using systemd and disable_file is present return disabled.''' +        write_file(self.disable_file, '')  # Create observed disable file +        (is_disabled, reason) = wrap_and_call( +            'cloudinit.cmd.status', +            {'uses_systemd': True}, +            status._is_cloudinit_disabled, self.disable_file, self.paths) +        self.assertTrue(is_disabled, 'expected disabled cloud-init') +        self.assertEqual( +            'Cloud-init disabled by {0}'.format(self.disable_file), reason) + +    def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): +        '''Not disabled when using systemd and enabled via commandline.''' +        write_file(self.disable_file, '')  # Create ignored disable file +        (is_disabled, reason) = wrap_and_call( +            'cloudinit.cmd.status', +            {'uses_systemd': True, +             'get_cmdline': 'something cloud-init=enabled else'}, +            status._is_cloudinit_disabled, self.disable_file, self.paths) +        self.assertFalse(is_disabled, 'expected enabled cloud-init') +        self.assertEqual( +            'Cloud-init enabled by kernel command line cloud-init=enabled', +            reason) + +    def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): +        '''When using systemd and disable_file is present return disabled.''' +        (is_disabled, reason) = wrap_and_call( +            'cloudinit.cmd.status', +            {'uses_systemd': True, +             'get_cmdline': 'something cloud-init=disabled else'}, +            status._is_cloudinit_disabled, self.disable_file, self.paths) +        self.assertTrue(is_disabled, 'expected disabled cloud-init') +        self.assertEqual( +            'Cloud-init disabled by kernel parameter cloud-init=disabled', +            reason) + +    def test__is_cloudinit_disabled_true_when_generator_disables(self): +        '''When cloud-init-generator doesn't write enabled file return True.''' +        enabled_file = os.path.join(self.paths.run_dir, 'enabled') +        self.assertFalse(os.path.exists(enabled_file)) +        (is_disabled, reason) = wrap_and_call( +            'cloudinit.cmd.status', +            {'uses_systemd': True, +             'get_cmdline': 'something'}, +            status._is_cloudinit_disabled, self.disable_file, self.paths) +        self.assertTrue(is_disabled, 'expected disabled cloud-init') +        self.assertEqual('Cloud-init disabled by cloud-init-generator', reason) + +    def test_status_returns_not_run(self): +        '''When status.json does not exist yet, return 'not run'.''' +        self.assertFalse( +            os.path.exists(self.status_file), 'Unexpected status.json found') +        cmdargs = myargs(long=False, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        self.assertEqual('status: not run\n', m_stdout.getvalue()) + +    def test_status_returns_disabled_long_on_presence_of_disable_file(self): +        '''When cloudinit is disabled, return disabled reason.''' + +        checked_files = [] + +        def fakeexists(filepath): +            checked_files.append(filepath) +            status_file = os.path.join(self.paths.run_dir, 'status.json') +            return bool(not filepath == status_file) + +        cmdargs = myargs(long=True, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'os.path.exists': {'side_effect': fakeexists}, +                 '_is_cloudinit_disabled': (True, 'disabled for some reason'), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        self.assertEqual( +            [os.path.join(self.paths.run_dir, 'status.json')], +            checked_files) +        expected = dedent('''\ +            status: disabled +            detail: +            disabled for some reason +        ''') +        self.assertEqual(expected, m_stdout.getvalue()) + +    def test_status_returns_running(self): +        '''Report running when status file exists but isn't finished.''' +        write_json(self.status_file, {'v1': {'init': {'finished': None}}}) +        cmdargs = myargs(long=False, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        self.assertEqual('status: running\n', m_stdout.getvalue()) + +    def test_status_returns_done(self): +        '''Reports done when stage is None and all stages are finished.''' +        write_json( +            self.status_file, +            {'v1': {'stage': None, +                    'datasource': ( +                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' +                        '[dsmode=net]'), +                    'blah': {'finished': 123.456}, +                    'init': {'errors': [], 'start': 124.567, +                             'finished': 125.678}, +                    'init-local': {'start': 123.45, 'finished': 123.46}}}) +        cmdargs = myargs(long=False, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        self.assertEqual('status: done\n', m_stdout.getvalue()) + +    def test_status_returns_done_long(self): +        '''Long format of done status includes datasource info.''' +        write_json( +            self.status_file, +            {'v1': {'stage': None, +                    'datasource': ( +                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' +                        '[dsmode=net]'), +                    'init': {'start': 124.567, 'finished': 125.678}, +                    'init-local': {'start': 123.45, 'finished': 123.46}}}) +        cmdargs = myargs(long=True, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        expected = dedent('''\ +            status: done +            time: Thu, 01 Jan 1970 00:02:05 +0000 +            detail: +            DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] +        ''') +        self.assertEqual(expected, m_stdout.getvalue()) + +    def test_status_on_errors(self): +        '''Reports error when any stage has errors.''' +        write_json( +            self.status_file, +            {'v1': {'stage': None, +                    'blah': {'errors': [], 'finished': 123.456}, +                    'init': {'errors': ['error1'], 'start': 124.567, +                             'finished': 125.678}, +                    'init-local': {'start': 123.45, 'finished': 123.46}}}) +        cmdargs = myargs(long=False, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(1, retcode) +        self.assertEqual('status: error\n', m_stdout.getvalue()) + +    def test_status_on_errors_long(self): +        '''Long format of error status includes all error messages.''' +        write_json( +            self.status_file, +            {'v1': {'stage': None, +                    'datasource': ( +                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' +                        '[dsmode=net]'), +                    'init': {'errors': ['error1'], 'start': 124.567, +                             'finished': 125.678}, +                    'init-local': {'errors': ['error2', 'error3'], +                                   'start': 123.45, 'finished': 123.46}}}) +        cmdargs = myargs(long=True, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(1, retcode) +        expected = dedent('''\ +            status: error +            time: Thu, 01 Jan 1970 00:02:05 +0000 +            detail: +            error1 +            error2 +            error3 +        ''') +        self.assertEqual(expected, m_stdout.getvalue()) + +    def test_status_returns_running_long_format(self): +        '''Long format reports the stage in which we are running.''' +        write_json( +            self.status_file, +            {'v1': {'stage': 'init', +                    'init': {'start': 124.456, 'finished': None}, +                    'init-local': {'start': 123.45, 'finished': 123.46}}}) +        cmdargs = myargs(long=True, wait=False) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        expected = dedent('''\ +            status: running +            time: Thu, 01 Jan 1970 00:02:04 +0000 +            detail: +            Running in stage: init +        ''') +        self.assertEqual(expected, m_stdout.getvalue()) + +    def test_status_wait_blocks_until_done(self): +        '''Specifying wait will poll every 1/4 second until done state.''' +        running_json = { +            'v1': {'stage': 'init', +                   'init': {'start': 124.456, 'finished': None}, +                   'init-local': {'start': 123.45, 'finished': 123.46}}} +        done_json = { +            'v1': {'stage': None, +                   'init': {'start': 124.456, 'finished': 125.678}, +                   'init-local': {'start': 123.45, 'finished': 123.46}}} + +        self.sleep_calls = 0 + +        def fake_sleep(interval): +            self.assertEqual(0.25, interval) +            self.sleep_calls += 1 +            if self.sleep_calls == 2: +                write_json(self.status_file, running_json) +            elif self.sleep_calls == 3: +                write_json(self.status_file, done_json) + +        cmdargs = myargs(long=False, wait=True) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'sleep': {'side_effect': fake_sleep}, +                 '_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(0, retcode) +        self.assertEqual(4, self.sleep_calls) +        self.assertEqual('....\nstatus: done\n', m_stdout.getvalue()) + +    def test_status_wait_blocks_until_error(self): +        '''Specifying wait will poll every 1/4 second until error state.''' +        running_json = { +            'v1': {'stage': 'init', +                   'init': {'start': 124.456, 'finished': None}, +                   'init-local': {'start': 123.45, 'finished': 123.46}}} +        error_json = { +            'v1': {'stage': None, +                   'init': {'errors': ['error1'], 'start': 124.456, +                            'finished': 125.678}, +                   'init-local': {'start': 123.45, 'finished': 123.46}}} + +        self.sleep_calls = 0 + +        def fake_sleep(interval): +            self.assertEqual(0.25, interval) +            self.sleep_calls += 1 +            if self.sleep_calls == 2: +                write_json(self.status_file, running_json) +            elif self.sleep_calls == 3: +                write_json(self.status_file, error_json) + +        cmdargs = myargs(long=False, wait=True) +        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +            retcode = wrap_and_call( +                'cloudinit.cmd.status', +                {'sleep': {'side_effect': fake_sleep}, +                 '_is_cloudinit_disabled': (False, ''), +                 'Init': {'side_effect': self.init_class}}, +                status.handle_status_args, 'ignored', cmdargs) +        self.assertEqual(1, retcode) +        self.assertEqual(4, self.sleep_calls) +        self.assertEqual('....\nstatus: error\n', m_stdout.getvalue()) + +    def test_status_main(self): +        '''status.main can be run as a standalone script.''' +        write_json(self.status_file, {'v1': {'init': {'finished': None}}}) +        with self.assertRaises(SystemExit) as context_manager: +            with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: +                wrap_and_call( +                    'cloudinit.cmd.status', +                    {'sys.argv': {'new': ['status']}, +                     '_is_cloudinit_disabled': (False, ''), +                     'Init': {'side_effect': self.init_class}}, +                    status.main) +        self.assertEqual(0, context_manager.exception.code) +        self.assertEqual('status: running\n', m_stdout.getvalue()) + +# vi: ts=4 expandtab syntax=python  | 
