summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py22
-rw-r--r--tests/unittests/test_builtin_handlers.py11
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py97
-rw-r--r--tests/unittests/test_datasource/test_maas.py8
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py157
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py10
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py50
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py255
-rw-r--r--tests/unittests/test_sshutil.py101
9 files changed, 648 insertions, 63 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 92540b0c..904677f1 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -2,6 +2,9 @@ import os
import sys
import unittest
+from contextlib import contextmanager
+
+from mocker import Mocker
from mocker import MockerTestCase
from cloudinit import helpers as ch
@@ -31,6 +34,17 @@ else:
pass
+@contextmanager
+def mocker(verify_calls=True):
+ m = Mocker()
+ try:
+ yield m
+ finally:
+ m.restore()
+ if verify_calls:
+ m.verify()
+
+
# Makes the old path start
# with new base instead of whatever
# it previously had
@@ -168,3 +182,11 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
trap_func = retarget_many_wrapper(new_root, 1, func)
setattr(mod, f, trap_func)
self.patched_funcs.append((mod, f, func))
+
+
+def populate_dir(path, files):
+ os.makedirs(path)
+ for (name, content) in files.iteritems():
+ with open(os.path.join(path, name), "w") as fp:
+ fp.write(content)
+ fp.close()
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index ebc0bd51..da52f15b 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,11 +1,13 @@
"""Tests of the built-in user data handlers."""
import os
+import unittest
from mocker import MockerTestCase
from cloudinit import handlers
from cloudinit import helpers
+from cloudinit import util
from cloudinit.handlers import upstart_job
@@ -33,7 +35,9 @@ class TestBuiltins(MockerTestCase):
None, None, None)
self.assertEquals(0, len(os.listdir(up_root)))
+ @unittest.skip("until LP: #1124384 fixed")
def test_upstart_frequency_single(self):
+ # files should be written out when frequency is ! per-instance
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({
@@ -41,9 +45,12 @@ class TestBuiltins(MockerTestCase):
'upstart_dir': up_root,
})
freq = PER_INSTANCE
+
+ mock_subp = self.mocker.replace(util.subp, passthrough=False)
+ mock_subp(["initctl", "reload-configuration"], capture=False)
+ self.mocker.replay()
+
h = upstart_job.UpstartJobPartHandler(paths)
- # No files should be written out when
- # the frequency is ! per-instance
h.handle_part('', handlers.CONTENT_START,
None, None, None)
h.handle_part('blah', 'text/upstart-job',
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index aa5b98ed..930086db 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -11,6 +11,7 @@ from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
+from tests.unittests import helpers as unit_helpers
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -89,23 +90,22 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- provided_name = dev_name[len('/dev/'):]
- provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -122,19 +122,18 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -156,17 +155,16 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
- my_mock.restore()
+ with unit_helpers.mocker(verify_calls=False) as my_mock:
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -259,19 +257,25 @@ class TestConfigDriveDataSource(MockerTestCase):
ds.read_config_drive_dir, my_d)
def test_find_candidates(self):
- devs_with_answers = {
- "TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ devs_with_answers = {}
def my_devs_with(criteria):
return devs_with_answers[criteria]
+ def my_is_partition(dev):
+ return dev[-1] in "0123456789" and not dev.startswith("sr")
+
try:
orig_find_devs_with = util.find_devs_with
util.find_devs_with = my_devs_with
+ orig_is_partition = util.is_partition
+ util.is_partition = my_is_partition
+
+ devs_with_answers = {"TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -287,6 +291,7 @@ class TestConfigDriveDataSource(MockerTestCase):
finally:
util.find_devs_with = orig_find_devs_with
+ util.is_partition = orig_is_partition
def test_pubkeys_v2(self):
"""Verify that public-keys work in config-drive-v2."""
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 85e6add0..b56fea82 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,6 +3,7 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
+from tests.unittests.helpers import populate_dir
from mocker import MockerTestCase
@@ -137,11 +138,4 @@ class TestMAASDataSource(MockerTestCase):
pass
-def populate_dir(seed_dir, files):
- os.mkdir(seed_dir)
- for (name, content) in files.iteritems():
- with open(os.path.join(seed_dir, name), "w") as fp:
- fp.write(content)
- fp.close()
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
new file mode 100644
index 00000000..62fc5358
--- /dev/null
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -0,0 +1,157 @@
+from cloudinit import helpers
+from cloudinit.sources import DataSourceNoCloud
+from cloudinit import util
+from tests.unittests.helpers import populate_dir
+
+from mocker import MockerTestCase
+import os
+import yaml
+
+
+class TestNoCloudDataSource(MockerTestCase):
+
+ def setUp(self):
+ self.tmp = self.makeDir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.cmdline = "root=TESTCMDLINE"
+
+ self.unapply = []
+ self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _getcmdline(self):
+ return self.cmdline
+
+ def test_nocloud_seed_dir(self):
+ md = {'instance-id': 'IID', 'dsmode': 'local'}
+ ud = "USER_DATA_HERE"
+ populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': ud, 'meta-data': yaml.safe_dump(md)})
+
+ sys_cfg = {
+ 'datasource': {'NoCloud': {'fs_label': None}}
+ }
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, ud)
+ self.assertEqual(dsrc.metadata, md)
+ self.assertTrue(ret)
+
+ def test_fs_label(self):
+ #find_devs_with should not be called ff fs_label is None
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ class PsuedoException(Exception):
+ pass
+
+ def my_find_devs_with(*args, **kwargs):
+ _f = (args, kwargs)
+ raise PsuedoException
+
+ self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+
+ # by default, NoCloud should search for filesystems by label
+ sys_cfg = {'datasource': {'NoCloud': {}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(PsuedoException, dsrc.get_data)
+
+ # but disabling searching should just end up with None found
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+
+ def test_no_datasource_expected(self):
+ #no source should be found if no cmdline, config, and fs_label=None
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertFalse(dsrc.get_data())
+
+ def test_seed_in_config(self):
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ data = {
+ 'fs_label': None,
+ 'meta-data': {'instance-id': 'IID'},
+ 'user-data': "USER_DATA_RAW",
+ }
+
+ sys_cfg = {'datasource': {'NoCloud': data}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
+ self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
+ self.assertTrue(ret)
+
+
+class TestParseCommandLineData(MockerTestCase):
+
+ def test_parse_cmdline_data_valid(self):
+ ds_id = "ds=nocloud"
+ pairs = (
+ ("root=/dev/sda1 %(ds_id)s", {}),
+ ("%(ds_id)s; root=/dev/foo", {}),
+ ("%(ds_id)s", {}),
+ ("%(ds_id)s;", {}),
+ ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
+ ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
+ {'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost",
+ {'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost;i=IID",
+ {'local-hostname': 'xhost', 'instance-id': 'IID'}),
+ )
+
+ for (fmt, expected) in pairs:
+ fill = {}
+ cmdline = fmt % {'ds_id': ds_id}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(expected, fill)
+ self.assertTrue(ret)
+
+ def test_parse_cmdline_data_none(self):
+ ds_id = "ds=foo"
+ cmdlines = (
+ "root=/dev/sda1 ro",
+ "console=/dev/ttyS0 root=/dev/foo",
+ "",
+ "ds=foocloud",
+ "ds=foo-net",
+ "ds=nocloud;s=SEED",
+ )
+
+ for cmdline in cmdlines:
+ fill = {}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(fill, {})
+ self.assertFalse(ret)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index 5d9d4311..50398c74 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -173,26 +173,29 @@ class TestUGNormalize(MockerTestCase):
'users': 'default'
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': 'default, joe'
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': ['bob', 'joe']
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertNotIn('bob', users)
+ self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
'users': {
@@ -204,6 +207,7 @@ class TestUGNormalize(MockerTestCase):
self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
}
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index d73c9fa9..0558023a 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -138,15 +138,47 @@ class TestAddCaCerts(MockerTestCase):
self.mocker.replay()
cc_ca_certs.add_ca_certs([])
- def test_single_cert(self):
- """Test adding a single certificate to the trusted CAs."""
+ def test_single_cert_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has trailing newline"""
cert = "CERT1\nLINE2\nLINE3"
+ ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
+ expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
+
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
+ mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ mock_write("/etc/ca-certificates.conf", expected, omode="wb")
+ self.mocker.replay()
+
+ cc_ca_certs.add_ca_certs([cert])
+
+ def test_single_cert_no_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has no trailing newline"""
+ cert = "CERT1\nLINE2\nLINE3"
+
+ ca_certs_content = "line1\nline2\nline3"
+
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+ "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
+ omode="wb")
self.mocker.replay()
cc_ca_certs.add_ca_certs([cert])
@@ -157,10 +189,18 @@ class TestAddCaCerts(MockerTestCase):
expected_cert_file = "\n".join(certs)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
- mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+
+ ca_certs_content = "line1\nline2\nline3"
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
+ mock_write("/etc/ca-certificates.conf", out, omode="wb")
+
self.mocker.replay()
cc_ca_certs.add_ca_certs(certs)
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
new file mode 100644
index 00000000..b1b872b0
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -0,0 +1,255 @@
+from mocker import MockerTestCase
+
+from cloudinit import cloud
+from cloudinit import util
+
+from cloudinit.config import cc_growpart
+
+import errno
+import logging
+import os
+import re
+
+# growpart:
+# mode: auto # off, on, auto, 'growpart', 'parted'
+# devices: ['root']
+
+HELP_PARTED_NO_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_PARTED_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ resizepart NUMBER END resize partition NUMBER
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_GROWPART_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ -u | --update R update the the kernel partition table info after growing
+ this requires kernel support and 'partx --update'
+ R is one of:
+ - 'auto' : [default] update partition if possible
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+HELP_GROWPART_NO_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+
+class TestDisabled(MockerTestCase):
+ def setUp(self):
+ super(TestDisabled, self).setUp()
+ self.name = "growpart"
+ self.cloud_init = None
+ self.log = logging.getLogger("TestDisabled")
+ self.args = []
+
+ self.handle = cc_growpart.handle
+
+ def test_mode_off(self):
+ #Test that nothing is done if mode is off.
+
+ # this really only verifies that resizer_factory isn't called
+ config = {'growpart': {'mode': 'off'}}
+ self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ self.mocker.replay()
+
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+
+class TestConfig(MockerTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.name = "growpart"
+ self.paths = None
+ self.cloud = cloud.Cloud(None, self.paths, None, None, None)
+ self.log = logging.getLogger("TestConfig")
+ self.args = []
+ os.environ = {}
+
+ self.cloud_init = None
+ self.handle = cc_growpart.handle
+
+ # Order must be correct
+ self.mocker.order()
+
+ def test_no_resizers_auto_is_fine(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_NO_RESIZE, ""))
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+ def test_no_resizers_mode_growpart_is_exception(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
+
+ def test_mode_auto_prefers_parted(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_RESIZE, ""))
+ self.mocker.replay()
+
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertTrue(isinstance(ret, cc_growpart.ResizeParted))
+
+ def test_handle_with_no_growpart_entry(self):
+ #if no 'growpart' entry in config, then mode=auto should be used
+
+ myresizer = object()
+
+ factory = self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ rsdevs = self.mocker.replace(cc_growpart.resize_devices,
+ passthrough=False)
+ factory("auto")
+ self.mocker.result(myresizer)
+ rsdevs(myresizer, ["/"])
+ self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
+ self.mocker.replay()
+
+ try:
+ orig_resizers = cc_growpart.RESIZERS
+ cc_growpart.RESIZERS = (('mysizer', object),)
+ self.handle(self.name, {}, self.cloud_init, self.log, self.args)
+ finally:
+ cc_growpart.RESIZERS = orig_resizers
+
+
+class TestResize(MockerTestCase):
+ def setUp(self):
+ super(TestResize, self).setUp()
+ self.name = "growpart"
+ self.log = logging.getLogger("TestResize")
+
+ # Order must be correct
+ self.mocker.order()
+
+ def test_simple_devices(self):
+ #test simple device list
+ # this patches out devent2dev, os.stat, and device_part_info
+ # so in the end, doesn't test a lot
+ devs = ["/dev/XXda1", "/dev/YYda2"]
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ st_nlink=1, st_uid=0, st_gid=6, st_size=0,
+ st_atime=0, st_mtime=0, st_ctime=0)
+ enoent = ["/dev/NOENT"]
+ real_stat = os.stat
+ resize_calls = []
+
+ class myresizer(object):
+ def resize(self, diskdev, partnum, partdev):
+ resize_calls.append((diskdev, partnum, partdev))
+ if partdev == "/dev/YYda2":
+ return (1024, 2048)
+ return (1024, 1024) # old size, new size
+
+ def mystat(path):
+ if path in devs:
+ return devstat_ret
+ if path in enoent:
+ e = OSError("%s: does not exist" % path)
+ e.errno = errno.ENOENT
+ raise e
+ return real_stat(path)
+
+ try:
+ opinfo = cc_growpart.device_part_info
+ cc_growpart.device_part_info = simple_device_part_info
+ os.stat = mystat
+
+ resized = cc_growpart.resize_devices(myresizer(), devs + enoent)
+
+ def find(name, res):
+ for f in res:
+ if f[0] == name:
+ return f
+ return None
+
+ self.assertEqual(cc_growpart.RESIZE.NOCHANGE,
+ find("/dev/XXda1", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.CHANGED,
+ find("/dev/YYda2", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.SKIPPED,
+ find(enoent[0], resized)[1])
+ #self.assertEqual(resize_calls,
+ #[("/dev/XXda", "1", "/dev/XXda1"),
+ #("/dev/YYda", "2", "/dev/YYda2")])
+ finally:
+ cc_growpart.device_part_info = opinfo
+ os.stat = real_stat
+
+
+def simple_device_part_info(devpath):
+ # simple stupid return (/dev/vda, 1) for /dev/vda
+ ret = re.search("([^0-9]*)([0-9]*)$", devpath)
+ x = (ret.group(1), ret.group(2))
+ return x
+
+
+class Bunch:
+ st_mode = None # fix pylint complaint
+
+ def __init__(self, **kwds):
+ self.__dict__.update(kwds)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
new file mode 100644
index 00000000..d8662cac
--- /dev/null
+++ b/tests/unittests/test_sshutil.py
@@ -0,0 +1,101 @@
+from cloudinit import ssh_util
+from unittest import TestCase
+
+
+VALID_CONTENT = {
+ 'dsa': (
+ "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
+ "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
+ "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
+ "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv"
+ "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4"
+ "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7"
+ "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P"
+ "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE"
+ "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
+ "5z7u2rVAlDw=="
+ ),
+ 'ecdsa': (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
+ "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
+ "Ql7lc2leWL7CY="
+ ),
+ 'rsa': (
+ "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
+ "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
+ "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
+ "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT"
+ "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
+ "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
+ ),
+}
+
+TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
+ 'command="echo \'Please login as the user \"ubuntu\" rather than the'
+ 'user \"root\".\';echo;sleep 10"')
+
+
+class TestAuthKeyLineParser(TestCase):
+ def test_simple_parse(self):
+ # test key line with common 3 fields (keytype, base64, comment)
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_no_comment(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ line = ' '.join((ktype, content,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertFalse(key.comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_keyoptions(self):
+ # test key line with options in it
+ parser = ssh_util.AuthKeyLineParser()
+ options = TEST_OPTIONS
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((options, ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertEqual(key.options, options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_options_passed_in(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+
+ baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host"))
+ myopts = "no-port-forwarding,no-agent-forwarding"
+
+ key = parser.parse("allowedopt" + " " + baseline)
+ self.assertEqual(key.options, "allowedopt")
+
+ key = parser.parse("overridden_opt " + baseline, options=myopts)
+ self.assertEqual(key.options, myopts)
+
+ def test_parse_invalid_keytype(self):
+ parser = ssh_util.AuthKeyLineParser()
+ key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']]))
+
+ self.assertFalse(key.valid())
+
+
+# vi: ts=4 expandtab