diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/helpers.py | 22 | ||||
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 11 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 97 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 8 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 157 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_user_data_normalize.py | 10 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ca_certs.py | 50 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_growpart.py | 255 | ||||
-rw-r--r-- | tests/unittests/test_sshutil.py | 101 |
9 files changed, 648 insertions, 63 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 92540b0c..904677f1 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -2,6 +2,9 @@ import os import sys import unittest +from contextlib import contextmanager + +from mocker import Mocker from mocker import MockerTestCase from cloudinit import helpers as ch @@ -31,6 +34,17 @@ else: pass +@contextmanager +def mocker(verify_calls=True): + m = Mocker() + try: + yield m + finally: + m.restore() + if verify_calls: + m.verify() + + # Makes the old path start # with new base instead of whatever # it previously had @@ -168,3 +182,11 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): trap_func = retarget_many_wrapper(new_root, 1, func) setattr(mod, f, trap_func) self.patched_funcs.append((mod, f, func)) + + +def populate_dir(path, files): + os.makedirs(path) + for (name, content) in files.iteritems(): + with open(os.path.join(path, name), "w") as fp: + fp.write(content) + fp.close() diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index ebc0bd51..da52f15b 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -1,11 +1,13 @@ """Tests of the built-in user data handlers.""" import os +import unittest from mocker import MockerTestCase from cloudinit import handlers from cloudinit import helpers +from cloudinit import util from cloudinit.handlers import upstart_job @@ -33,7 +35,9 @@ class TestBuiltins(MockerTestCase): None, None, None) self.assertEquals(0, len(os.listdir(up_root))) + @unittest.skip("until LP: #1124384 fixed") def test_upstart_frequency_single(self): + # files should be written out when frequency is ! per-instance c_root = self.makeDir() up_root = self.makeDir() paths = helpers.Paths({ @@ -41,9 +45,12 @@ class TestBuiltins(MockerTestCase): 'upstart_dir': up_root, }) freq = PER_INSTANCE + + mock_subp = self.mocker.replace(util.subp, passthrough=False) + mock_subp(["initctl", "reload-configuration"], capture=False) + self.mocker.replay() + h = upstart_job.UpstartJobPartHandler(paths) - # No files should be written out when - # the frequency is ! per-instance h.handle_part('', handlers.CONTENT_START, None, None, None) h.handle_part('blah', 'text/upstart-job', diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index aa5b98ed..930086db 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -11,6 +11,7 @@ from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit import util +from tests.unittests import helpers as unit_helpers PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' EC2_META = { @@ -89,23 +90,22 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - provided_name = dev_name[len('/dev/'):] - provided_name = "s" + provided_name[1:] - find_mock(mocker.ARGS) - my_mock.result([provided_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - my_mock.restore() - self.assertEquals(dev_name, device) + with unit_helpers.mocker() as my_mock: + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + provided_name = dev_name[len('/dev/'):] + provided_name = "s" + provided_name[1:] + find_mock(mocker.ARGS) + my_mock.result([provided_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_os_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -122,19 +122,18 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - find_mock(mocker.ARGS) - my_mock.result([dev_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - my_mock.restore() - self.assertEquals(dev_name, device) + with unit_helpers.mocker() as my_mock: + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + find_mock(mocker.ARGS) + my_mock.result([dev_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_ec2_remap(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -156,17 +155,16 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - my_mock = mocker.Mocker() - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() - device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) - my_mock.restore() + with unit_helpers.mocker(verify_calls=False) as my_mock: + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dev_ec2_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -259,19 +257,25 @@ class TestConfigDriveDataSource(MockerTestCase): ds.read_config_drive_dir, my_d) def test_find_candidates(self): - devs_with_answers = { - "TYPE=vfat": [], - "TYPE=iso9660": ["/dev/vdb"], - "LABEL=config-2": ["/dev/vdb"], - } + devs_with_answers = {} def my_devs_with(criteria): return devs_with_answers[criteria] + def my_is_partition(dev): + return dev[-1] in "0123456789" and not dev.startswith("sr") + try: orig_find_devs_with = util.find_devs_with util.find_devs_with = my_devs_with + orig_is_partition = util.is_partition + util.is_partition = my_is_partition + + devs_with_answers = {"TYPE=vfat": [], + "TYPE=iso9660": ["/dev/vdb"], + "LABEL=config-2": ["/dev/vdb"], + } self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) # add a vfat item @@ -287,6 +291,7 @@ class TestConfigDriveDataSource(MockerTestCase): finally: util.find_devs_with = orig_find_devs_with + util.is_partition = orig_is_partition def test_pubkeys_v2(self): """Verify that public-keys work in config-drive-v2.""" diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 85e6add0..b56fea82 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -3,6 +3,7 @@ import os from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper +from tests.unittests.helpers import populate_dir from mocker import MockerTestCase @@ -137,11 +138,4 @@ class TestMAASDataSource(MockerTestCase): pass -def populate_dir(seed_dir, files): - os.mkdir(seed_dir) - for (name, content) in files.iteritems(): - with open(os.path.join(seed_dir, name), "w") as fp: - fp.write(content) - fp.close() - # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py new file mode 100644 index 00000000..62fc5358 --- /dev/null +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -0,0 +1,157 @@ +from cloudinit import helpers +from cloudinit.sources import DataSourceNoCloud +from cloudinit import util +from tests.unittests.helpers import populate_dir + +from mocker import MockerTestCase +import os +import yaml + + +class TestNoCloudDataSource(MockerTestCase): + + def setUp(self): + self.tmp = self.makeDir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + self.cmdline = "root=TESTCMDLINE" + + self.unapply = [] + self.apply_patches([(util, 'get_cmdline', self._getcmdline)]) + super(TestNoCloudDataSource, self).setUp() + + def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) + super(TestNoCloudDataSource, self).setUp() + + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def _getcmdline(self): + return self.cmdline + + def test_nocloud_seed_dir(self): + md = {'instance-id': 'IID', 'dsmode': 'local'} + ud = "USER_DATA_HERE" + populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), + {'user-data': ud, 'meta-data': yaml.safe_dump(md)}) + + sys_cfg = { + 'datasource': {'NoCloud': {'fs_label': None}} + } + + ds = DataSourceNoCloud.DataSourceNoCloud + + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, ud) + self.assertEqual(dsrc.metadata, md) + self.assertTrue(ret) + + def test_fs_label(self): + #find_devs_with should not be called ff fs_label is None + ds = DataSourceNoCloud.DataSourceNoCloud + + class PsuedoException(Exception): + pass + + def my_find_devs_with(*args, **kwargs): + _f = (args, kwargs) + raise PsuedoException + + self.apply_patches([(util, 'find_devs_with', my_find_devs_with)]) + + # by default, NoCloud should search for filesystems by label + sys_cfg = {'datasource': {'NoCloud': {}}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertRaises(PsuedoException, dsrc.get_data) + + # but disabling searching should just end up with None found + sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertFalse(ret) + + def test_no_datasource_expected(self): + #no source should be found if no cmdline, config, and fs_label=None + sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} + + ds = DataSourceNoCloud.DataSourceNoCloud + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertFalse(dsrc.get_data()) + + def test_seed_in_config(self): + ds = DataSourceNoCloud.DataSourceNoCloud + + data = { + 'fs_label': None, + 'meta-data': {'instance-id': 'IID'}, + 'user-data': "USER_DATA_RAW", + } + + sys_cfg = {'datasource': {'NoCloud': data}} + dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW") + self.assertEqual(dsrc.metadata.get('instance-id'), 'IID') + self.assertTrue(ret) + + +class TestParseCommandLineData(MockerTestCase): + + def test_parse_cmdline_data_valid(self): + ds_id = "ds=nocloud" + pairs = ( + ("root=/dev/sda1 %(ds_id)s", {}), + ("%(ds_id)s; root=/dev/foo", {}), + ("%(ds_id)s", {}), + ("%(ds_id)s;", {}), + ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}), + ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost", + {'seedfrom': 'SEED', 'local-hostname': 'xhost'}), + ("%(ds_id)s;h=xhost", + {'local-hostname': 'xhost'}), + ("%(ds_id)s;h=xhost;i=IID", + {'local-hostname': 'xhost', 'instance-id': 'IID'}), + ) + + for (fmt, expected) in pairs: + fill = {} + cmdline = fmt % {'ds_id': ds_id} + ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, + cmdline=cmdline) + self.assertEqual(expected, fill) + self.assertTrue(ret) + + def test_parse_cmdline_data_none(self): + ds_id = "ds=foo" + cmdlines = ( + "root=/dev/sda1 ro", + "console=/dev/ttyS0 root=/dev/foo", + "", + "ds=foocloud", + "ds=foo-net", + "ds=nocloud;s=SEED", + ) + + for cmdline in cmdlines: + fill = {} + ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill, + cmdline=cmdline) + self.assertEqual(fill, {}) + self.assertFalse(ret) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py index 5d9d4311..50398c74 100644 --- a/tests/unittests/test_distros/test_user_data_normalize.py +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -173,26 +173,29 @@ class TestUGNormalize(MockerTestCase): 'users': 'default' } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertNotIn('bob', users) # Bob is not the default now, zetta is self.assertIn('zetta', users) + self.assertTrue(users['zetta']['default']) self.assertNotIn('default', users) ug_cfg = { 'user': 'zetta', 'users': 'default, joe' } (users, _groups) = self._norm(ug_cfg, distro) - self.assertIn('bob', users) + self.assertNotIn('bob', users) # Bob is not the default now, zetta is self.assertIn('joe', users) self.assertIn('zetta', users) + self.assertTrue(users['zetta']['default']) self.assertNotIn('default', users) ug_cfg = { 'user': 'zetta', 'users': ['bob', 'joe'] } (users, _groups) = self._norm(ug_cfg, distro) - self.assertNotIn('bob', users) + self.assertIn('bob', users) self.assertIn('joe', users) self.assertIn('zetta', users) + self.assertTrue(users['zetta']['default']) ug_cfg = { 'user': 'zetta', 'users': { @@ -204,6 +207,7 @@ class TestUGNormalize(MockerTestCase): self.assertIn('bob', users) self.assertIn('joe', users) self.assertIn('zetta', users) + self.assertTrue(users['zetta']['default']) ug_cfg = { 'user': 'zetta', } diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index d73c9fa9..0558023a 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -138,15 +138,47 @@ class TestAddCaCerts(MockerTestCase): self.mocker.replay() cc_ca_certs.add_ca_certs([]) - def test_single_cert(self): - """Test adding a single certificate to the trusted CAs.""" + def test_single_cert_trailing_cr(self): + """Test adding a single certificate to the trusted CAs + when existing ca-certificates has trailing newline""" cert = "CERT1\nLINE2\nLINE3" + ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" + expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" + + mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", + cert, mode=0644) + + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + + mock_write("/etc/ca-certificates.conf", expected, omode="wb") + self.mocker.replay() + + cc_ca_certs.add_ca_certs([cert]) + + def test_single_cert_no_trailing_cr(self): + """Test adding a single certificate to the trusted CAs + when existing ca-certificates has no trailing newline""" + cert = "CERT1\nLINE2\nLINE3" + + ca_certs_content = "line1\nline2\nline3" + mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", cert, mode=0644) + + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="ab") + "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), + omode="wb") self.mocker.replay() cc_ca_certs.add_ca_certs([cert]) @@ -157,10 +189,18 @@ class TestAddCaCerts(MockerTestCase): expected_cert_file = "\n".join(certs) mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", expected_cert_file, mode=0644) - mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="ab") + + ca_certs_content = "line1\nline2\nline3" + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + + out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt") + mock_write("/etc/ca-certificates.conf", out, omode="wb") + self.mocker.replay() cc_ca_certs.add_ca_certs(certs) diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py new file mode 100644 index 00000000..b1b872b0 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_growpart.py @@ -0,0 +1,255 @@ +from mocker import MockerTestCase + +from cloudinit import cloud +from cloudinit import util + +from cloudinit.config import cc_growpart + +import errno +import logging +import os +import re + +# growpart: +# mode: auto # off, on, auto, 'growpart', 'parted' +# devices: ['root'] + +HELP_PARTED_NO_RESIZE = """ +Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] +Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in +interactive mode. + +OPTIONs: +<SNIP> + +COMMANDs: +<SNIP> + quit exit program + rescue START END rescue a lost partition near START + and END + resize NUMBER START END resize partition NUMBER and its file + system + rm NUMBER delete partition NUMBER +<SNIP> +Report bugs to bug-parted@gnu.org +""" + +HELP_PARTED_RESIZE = """ +Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] +Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in +interactive mode. + +OPTIONs: +<SNIP> + +COMMANDs: +<SNIP> + quit exit program + rescue START END rescue a lost partition near START + and END + resize NUMBER START END resize partition NUMBER and its file + system + resizepart NUMBER END resize partition NUMBER + rm NUMBER delete partition NUMBER +<SNIP> +Report bugs to bug-parted@gnu.org +""" + +HELP_GROWPART_RESIZE = """ +growpart disk partition + rewrite partition table so that partition takes up all the space it can + options: + -h | --help print Usage and exit +<SNIP> + -u | --update R update the the kernel partition table info after growing + this requires kernel support and 'partx --update' + R is one of: + - 'auto' : [default] update partition if possible +<SNIP> + Example: + - growpart /dev/sda 1 + Resize partition 1 on /dev/sda +""" + +HELP_GROWPART_NO_RESIZE = """ +growpart disk partition + rewrite partition table so that partition takes up all the space it can + options: + -h | --help print Usage and exit +<SNIP> + Example: + - growpart /dev/sda 1 + Resize partition 1 on /dev/sda +""" + + +class TestDisabled(MockerTestCase): + def setUp(self): + super(TestDisabled, self).setUp() + self.name = "growpart" + self.cloud_init = None + self.log = logging.getLogger("TestDisabled") + self.args = [] + + self.handle = cc_growpart.handle + + def test_mode_off(self): + #Test that nothing is done if mode is off. + + # this really only verifies that resizer_factory isn't called + config = {'growpart': {'mode': 'off'}} + self.mocker.replace(cc_growpart.resizer_factory, + passthrough=False) + self.mocker.replay() + + self.handle(self.name, config, self.cloud_init, self.log, self.args) + + +class TestConfig(MockerTestCase): + def setUp(self): + super(TestConfig, self).setUp() + self.name = "growpart" + self.paths = None + self.cloud = cloud.Cloud(None, self.paths, None, None, None) + self.log = logging.getLogger("TestConfig") + self.args = [] + os.environ = {} + + self.cloud_init = None + self.handle = cc_growpart.handle + + # Order must be correct + self.mocker.order() + + def test_no_resizers_auto_is_fine(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['parted', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_PARTED_NO_RESIZE, "")) + subp(['growpart', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) + self.mocker.replay() + + config = {'growpart': {'mode': 'auto'}} + self.handle(self.name, config, self.cloud_init, self.log, self.args) + + def test_no_resizers_mode_growpart_is_exception(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['growpart', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) + self.mocker.replay() + + config = {'growpart': {'mode': "growpart"}} + self.assertRaises(ValueError, self.handle, self.name, config, + self.cloud_init, self.log, self.args) + + def test_mode_auto_prefers_parted(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['parted', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_PARTED_RESIZE, "")) + self.mocker.replay() + + ret = cc_growpart.resizer_factory(mode="auto") + self.assertTrue(isinstance(ret, cc_growpart.ResizeParted)) + + def test_handle_with_no_growpart_entry(self): + #if no 'growpart' entry in config, then mode=auto should be used + + myresizer = object() + + factory = self.mocker.replace(cc_growpart.resizer_factory, + passthrough=False) + rsdevs = self.mocker.replace(cc_growpart.resize_devices, + passthrough=False) + factory("auto") + self.mocker.result(myresizer) + rsdevs(myresizer, ["/"]) + self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),)) + self.mocker.replay() + + try: + orig_resizers = cc_growpart.RESIZERS + cc_growpart.RESIZERS = (('mysizer', object),) + self.handle(self.name, {}, self.cloud_init, self.log, self.args) + finally: + cc_growpart.RESIZERS = orig_resizers + + +class TestResize(MockerTestCase): + def setUp(self): + super(TestResize, self).setUp() + self.name = "growpart" + self.log = logging.getLogger("TestResize") + + # Order must be correct + self.mocker.order() + + def test_simple_devices(self): + #test simple device list + # this patches out devent2dev, os.stat, and device_part_info + # so in the end, doesn't test a lot + devs = ["/dev/XXda1", "/dev/YYda2"] + devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L, + st_nlink=1, st_uid=0, st_gid=6, st_size=0, + st_atime=0, st_mtime=0, st_ctime=0) + enoent = ["/dev/NOENT"] + real_stat = os.stat + resize_calls = [] + + class myresizer(object): + def resize(self, diskdev, partnum, partdev): + resize_calls.append((diskdev, partnum, partdev)) + if partdev == "/dev/YYda2": + return (1024, 2048) + return (1024, 1024) # old size, new size + + def mystat(path): + if path in devs: + return devstat_ret + if path in enoent: + e = OSError("%s: does not exist" % path) + e.errno = errno.ENOENT + raise e + return real_stat(path) + + try: + opinfo = cc_growpart.device_part_info + cc_growpart.device_part_info = simple_device_part_info + os.stat = mystat + + resized = cc_growpart.resize_devices(myresizer(), devs + enoent) + + def find(name, res): + for f in res: + if f[0] == name: + return f + return None + + self.assertEqual(cc_growpart.RESIZE.NOCHANGE, + find("/dev/XXda1", resized)[1]) + self.assertEqual(cc_growpart.RESIZE.CHANGED, + find("/dev/YYda2", resized)[1]) + self.assertEqual(cc_growpart.RESIZE.SKIPPED, + find(enoent[0], resized)[1]) + #self.assertEqual(resize_calls, + #[("/dev/XXda", "1", "/dev/XXda1"), + #("/dev/YYda", "2", "/dev/YYda2")]) + finally: + cc_growpart.device_part_info = opinfo + os.stat = real_stat + + +def simple_device_part_info(devpath): + # simple stupid return (/dev/vda, 1) for /dev/vda + ret = re.search("([^0-9]*)([0-9]*)$", devpath) + x = (ret.group(1), ret.group(2)) + return x + + +class Bunch: + st_mode = None # fix pylint complaint + + def __init__(self, **kwds): + self.__dict__.update(kwds) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py new file mode 100644 index 00000000..d8662cac --- /dev/null +++ b/tests/unittests/test_sshutil.py @@ -0,0 +1,101 @@ +from cloudinit import ssh_util +from unittest import TestCase + + +VALID_CONTENT = { + 'dsa': ( + "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" + "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" + "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" + "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv" + "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4" + "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7" + "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P" + "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE" + "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" + "5z7u2rVAlDw==" + ), + 'ecdsa': ( + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" + "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" + "Ql7lc2leWL7CY=" + ), + 'rsa': ( + "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" + "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" + "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" + "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT" + "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" + "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" + ), +} + +TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," + 'command="echo \'Please login as the user \"ubuntu\" rather than the' + 'user \"root\".\';echo;sleep 10"') + + +class TestAuthKeyLineParser(TestCase): + def test_simple_parse(self): + # test key line with common 3 fields (keytype, base64, comment) + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_no_comment(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + line = ' '.join((ktype, content,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertFalse(key.comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_keyoptions(self): + # test key line with options in it + parser = ssh_util.AuthKeyLineParser() + options = TEST_OPTIONS + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((options, ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertEqual(key.options, options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_options_passed_in(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + + baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host")) + myopts = "no-port-forwarding,no-agent-forwarding" + + key = parser.parse("allowedopt" + " " + baseline) + self.assertEqual(key.options, "allowedopt") + + key = parser.parse("overridden_opt " + baseline, options=myopts) + self.assertEqual(key.options, myopts) + + def test_parse_invalid_keytype(self): + parser = ssh_util.AuthKeyLineParser() + key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']])) + + self.assertFalse(key.valid()) + + +# vi: ts=4 expandtab |