from mocker import MockerTestCase from cloudinit import cloud from cloudinit import helpers from cloudinit import util from cloudinit.config import cc_growpart import errno import logging import os import mocker import re import stat # growpart: # mode: auto # off, on, auto, 'growpart', 'parted' # devices: ['root'] HELP_PARTED_NO_RESIZE = """ Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in interactive mode. OPTIONs: COMMANDs: quit exit program rescue START END rescue a lost partition near START and END resize NUMBER START END resize partition NUMBER and its file system rm NUMBER delete partition NUMBER Report bugs to bug-parted@gnu.org """ HELP_PARTED_RESIZE = """ Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in interactive mode. OPTIONs: COMMANDs: quit exit program rescue START END rescue a lost partition near START and END resize NUMBER START END resize partition NUMBER and its file system resizepart NUMBER END resize partition NUMBER rm NUMBER delete partition NUMBER Report bugs to bug-parted@gnu.org """ HELP_GROWPART_RESIZE = """ growpart disk partition rewrite partition table so that partition takes up all the space it can options: -h | --help print Usage and exit -u | --update R update the the kernel partition table info after growing this requires kernel support and 'partx --update' R is one of: - 'auto' : [default] update partition if possible Example: - growpart /dev/sda 1 Resize partition 1 on /dev/sda """ HELP_GROWPART_NO_RESIZE = """ growpart disk partition rewrite partition table so that partition takes up all the space it can options: -h | --help print Usage and exit Example: - growpart /dev/sda 1 Resize partition 1 on /dev/sda """ class TestDisabled(MockerTestCase): def setUp(self): super(TestDisabled, self).setUp() self.name = "growpart" self.cloud_init = None self.log = logging.getLogger("TestDisabled") self.args = [] self.handle = cc_growpart.handle def test_mode_off(self): #Test that nothing is done if mode is off. # this really only verifies that resizer_factory isn't called config = {'growpart': {'mode': 'off'}} self.mocker.replace(cc_growpart.resizer_factory, passthrough=False) self.mocker.replay() self.handle(self.name, config, self.cloud_init, self.log, self.args) class TestConfig(MockerTestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "growpart" self.paths = None self.cloud = cloud.Cloud(None, self.paths, None, None, None) self.log = logging.getLogger("TestConfig") self.args = [] os.environ = {} self.cloud_init = None self.handle = cc_growpart.handle # Order must be correct self.mocker.order() def test_no_resizers_auto_is_fine(self): subp = self.mocker.replace(util.subp, passthrough=False) subp(['parted', '--help'], env={'LANG': 'C'}) self.mocker.result((HELP_PARTED_NO_RESIZE,"")) subp(['growpart', '--help'], env={'LANG': 'C'}) self.mocker.result((HELP_GROWPART_NO_RESIZE,"")) self.mocker.replay() config = {'growpart': {'mode': 'auto'}} self.handle(self.name, config, self.cloud_init, self.log, self.args) def test_no_resizers_mode_growpart_is_exception(self): subp = self.mocker.replace(util.subp, passthrough=False) subp(['growpart', '--help'], env={'LANG': 'C'}) self.mocker.result((HELP_GROWPART_NO_RESIZE,"")) self.mocker.replay() config = {'growpart': {'mode': "growpart"}} self.assertRaises(ValueError, self.handle, self.name, config, self.cloud_init, self.log, self.args) def test_mode_auto_prefers_parted(self): subp = self.mocker.replace(util.subp, passthrough=False) subp(['parted', '--help'], env={'LANG': 'C'}) self.mocker.result((HELP_PARTED_RESIZE,"")) self.mocker.replay() ret = cc_growpart.resizer_factory(mode="auto") self.assertTrue(isinstance(ret, cc_growpart.ResizeParted)) def test_handle_with_no_growpart_entry(self): #if no 'growpart' entry in config, then mode=auto should be used myresizer = object() factory = self.mocker.replace(cc_growpart.resizer_factory, passthrough=False) rsdevs = self.mocker.replace(cc_growpart.resize_devices, passthrough=False) factory("auto") self.mocker.result(myresizer) rsdevs(myresizer, ["/"]) self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),)) self.mocker.replay() try: orig_resizers = cc_growpart.RESIZERS cc_growpart.RESIZERS = (('mysizer', object),) self.handle(self.name, {}, self.cloud_init, self.log, self.args) finally: cc_growpart.RESIZERS = orig_resizers class TestResize(MockerTestCase): def setUp(self): super(TestResize, self).setUp() self.name = "growpart" self.log = logging.getLogger("TestResize") # Order must be correct self.mocker.order() def test_simple_devices(self): #test simple device list # this patches out devent2dev, os.stat, and device_part_info # so in the end, doesn't test a lot devs = ["/dev/XXda1", "/dev/YYda2"] devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L, st_nlink=1, st_uid=0, st_gid=6, st_size=0, st_atime=0, st_mtime=0, st_ctime=0) enoent = ["/dev/NOENT"] real_stat = os.stat resize_calls = [] class myresizer(): def resize(self, diskdev, partnum, partdev): resize_calls.append((diskdev, partnum, partdev)) if partdev == "/dev/YYda2": return (1024, 2048) return (1024, 1024) # old size, new size def mystat(path): if path in devs: return devstat_ret if path in enoent: e = OSError("%s: does not exist" % path) e.errno = errno.ENOENT raise e return real_stat(path) try: opinfo = cc_growpart.device_part_info cc_growpart.device_part_info = simple_device_part_info os.stat = mystat resized = cc_growpart.resize_devices(myresizer(), devs + enoent) def find(name, res): for f in res: if f[0] == name: return f return None self.assertEqual(cc_growpart.RESIZE.NOCHANGE, find("/dev/XXda1", resized)[1]) self.assertEqual(cc_growpart.RESIZE.CHANGED, find("/dev/YYda2", resized)[1]) self.assertEqual(cc_growpart.RESIZE.SKIPPED, find(enoent[0], resized)[1]) #self.assertEqual(resize_calls, #[("/dev/XXda", "1", "/dev/XXda1"), #("/dev/YYda", "2", "/dev/YYda2")]) finally: cc_growpart.device_part_info = opinfo os.stat = real_stat def simple_device_part_info(devpath): # simple stupid return (/dev/vda, 1) for /dev/vda ret = re.search("([^0-9]*)([0-9]*)$", devpath) x = (ret.group(1), ret.group(2)) return x class Bunch: def __init__(self, **kwds): self.__dict__.update(kwds) # vi: ts=4 expandtab