# This file is part of cloud-init. See LICENSE file for license information. from cloudinit import cloud from cloudinit.config import cc_growpart from cloudinit import subp from cloudinit import temp_utils from tests.unittests.helpers import TestCase import errno import logging import os import shutil import re import unittest from contextlib import ExitStack from unittest import mock import stat # growpart: # mode: auto # off, on, auto, 'growpart' # devices: ['root'] HELP_GROWPART_RESIZE = """ growpart disk partition rewrite partition table so that partition takes up all the space it can options: -h | --help print Usage and exit -u | --update R update the the kernel partition table info after growing this requires kernel support and 'partx --update' R is one of: - 'auto' : [default] update partition if possible Example: - growpart /dev/sda 1 Resize partition 1 on /dev/sda """ HELP_GROWPART_NO_RESIZE = """ growpart disk partition rewrite partition table so that partition takes up all the space it can options: -h | --help print Usage and exit Example: - growpart /dev/sda 1 Resize partition 1 on /dev/sda """ HELP_GPART = """ usage: gpart add -t type [-a alignment] [-b start] geom gpart backup geom gpart bootcode [-b bootcode] [-p partcode -i index] [-f flags] geom gpart resize -i index [-a alignment] [-s size] [-f flags] geom gpart restore [-lF] [-f flags] provider [...] gpart recover [-f flags] geom gpart help """ class Dir: '''Stub object''' def __init__(self, name): self.name = name self.st_mode = name def is_dir(self, *args, **kwargs): return True def stat(self, *args, **kwargs): return self class Scanner: '''Stub object''' def __enter__(self): return (Dir(''), Dir(''),) def __exit__(self, *args): pass class TestDisabled(unittest.TestCase): def setUp(self): super(TestDisabled, self).setUp() self.name = "growpart" self.cloud_init = None self.log = logging.getLogger("TestDisabled") self.args = [] self.handle = cc_growpart.handle def test_mode_off(self): # Test that nothing is done if mode is off. # this really only verifies that resizer_factory isn't called config = {'growpart': {'mode': 'off'}} with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj: self.handle(self.name, config, self.cloud_init, self.log, self.args) self.assertEqual(mockobj.call_count, 0) class TestConfig(TestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "growpart" self.paths = None self.cloud = cloud.Cloud(None, self.paths, None, None, None) self.log = logging.getLogger("TestConfig") self.args = [] self.cloud_init = None self.handle = cc_growpart.handle self.tmppath = '/tmp/cloudinit-test-file' self.tmpdir = os.scandir('/tmp') self.tmpfile = open(self.tmppath, 'w') def tearDown(self): self.tmpfile.close() os.remove(self.tmppath) @mock.patch.dict("os.environ", clear=True) def test_no_resizers_auto_is_fine(self): with mock.patch.object( subp, 'subp', return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: config = {'growpart': {'mode': 'auto'}} self.handle(self.name, config, self.cloud_init, self.log, self.args) mockobj.assert_has_calls([ mock.call(['growpart', '--help'], env={'LANG': 'C'}), mock.call(['gpart', 'help'], env={'LANG': 'C'}, rcs=[0, 1])]) @mock.patch.dict("os.environ", clear=True) def test_no_resizers_mode_growpart_is_exception(self): with mock.patch.object( subp, 'subp', return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: config = {'growpart': {'mode': "growpart"}} self.assertRaises( ValueError, self.handle, self.name, config, self.cloud_init, self.log, self.args) mockobj.assert_called_once_with( ['growpart', '--help'], env={'LANG': 'C'}) @mock.patch.dict("os.environ", clear=True) def test_mode_auto_prefers_growpart(self): with mock.patch.object( subp, 'subp', return_value=(HELP_GROWPART_RESIZE, "")) as mockobj: ret = cc_growpart.resizer_factory(mode="auto") self.assertIsInstance(ret, cc_growpart.ResizeGrowPart) mockobj.assert_called_once_with( ['growpart', '--help'], env={'LANG': 'C'}) @mock.patch.dict("os.environ", {'LANG': 'cs_CZ.UTF-8'}, clear=True) @mock.patch.object(temp_utils, 'mkdtemp', return_value='/tmp/much-random') @mock.patch.object(stat, 'S_ISDIR', return_value=False) @mock.patch.object(os.path, 'samestat', return_value=True) @mock.patch.object(os.path, "join", return_value='/tmp') @mock.patch.object(os, 'scandir', return_value=Scanner()) @mock.patch.object(os, 'mkdir') @mock.patch.object(os, 'unlink') @mock.patch.object(os, 'rmdir') @mock.patch.object(os, 'open', return_value=1) @mock.patch.object(os, 'close') @mock.patch.object(shutil, 'rmtree') @mock.patch.object(os, 'lseek', return_value=1024) @mock.patch.object(os, 'lstat', return_value='interesting metadata') def test_force_lang_check_tempfile(self, *args, **kwargs): with mock.patch.object( subp, 'subp', return_value=(HELP_GROWPART_RESIZE, "")) as mockobj: ret = cc_growpart.resizer_factory(mode="auto") self.assertIsInstance(ret, cc_growpart.ResizeGrowPart) diskdev = '/dev/sdb' partnum = 1 partdev = '/dev/sdb' ret.resize(diskdev, partnum, partdev) mockobj.assert_has_calls([ mock.call( ["growpart", '--dry-run', diskdev, partnum], env={'LANG': 'C', 'TMPDIR': '/tmp'}), mock.call( ["growpart", diskdev, partnum], env={'LANG': 'C', 'TMPDIR': '/tmp'}), ]) @mock.patch.dict("os.environ", {'LANG': 'cs_CZ.UTF-8'}, clear=True) def test_mode_auto_falls_back_to_gpart(self): with mock.patch.object( subp, 'subp', return_value=("", HELP_GPART)) as mockobj: ret = cc_growpart.resizer_factory(mode="auto") self.assertIsInstance(ret, cc_growpart.ResizeGpart) mockobj.assert_has_calls([ mock.call(['growpart', '--help'], env={'LANG': 'C'}), mock.call(['gpart', 'help'], env={'LANG': 'C'}, rcs=[0, 1])]) def test_handle_with_no_growpart_entry(self): # if no 'growpart' entry in config, then mode=auto should be used myresizer = object() retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),) with ExitStack() as mocks: factory = mocks.enter_context( mock.patch.object(cc_growpart, 'resizer_factory', return_value=myresizer)) rsdevs = mocks.enter_context( mock.patch.object(cc_growpart, 'resize_devices', return_value=retval)) mocks.enter_context( mock.patch.object(cc_growpart, 'RESIZERS', (('mysizer', object),) )) self.handle(self.name, {}, self.cloud_init, self.log, self.args) factory.assert_called_once_with('auto') rsdevs.assert_called_once_with(myresizer, ['/']) class TestResize(unittest.TestCase): def setUp(self): super(TestResize, self).setUp() self.name = "growpart" self.log = logging.getLogger("TestResize") def test_simple_devices(self): # test simple device list # this patches out devent2dev, os.stat, and device_part_info # so in the end, doesn't test a lot devs = ["/dev/XXda1", "/dev/YYda2"] devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5, st_nlink=1, st_uid=0, st_gid=6, st_size=0, st_atime=0, st_mtime=0, st_ctime=0) enoent = ["/dev/NOENT"] real_stat = os.stat resize_calls = [] class myresizer(object): def resize(self, diskdev, partnum, partdev): resize_calls.append((diskdev, partnum, partdev)) if partdev == "/dev/YYda2": return (1024, 2048) return (1024, 1024) # old size, new size def mystat(path): if path in devs: return devstat_ret if path in enoent: e = OSError("%s: does not exist" % path) e.errno = errno.ENOENT raise e return real_stat(path) try: opinfo = cc_growpart.device_part_info cc_growpart.device_part_info = simple_device_part_info os.stat = mystat resized = cc_growpart.resize_devices(myresizer(), devs + enoent) def find(name, res): for f in res: if f[0] == name: return f return None self.assertEqual(cc_growpart.RESIZE.NOCHANGE, find("/dev/XXda1", resized)[1]) self.assertEqual(cc_growpart.RESIZE.CHANGED, find("/dev/YYda2", resized)[1]) self.assertEqual(cc_growpart.RESIZE.SKIPPED, find(enoent[0], resized)[1]) # self.assertEqual(resize_calls, # [("/dev/XXda", "1", "/dev/XXda1"), # ("/dev/YYda", "2", "/dev/YYda2")]) finally: cc_growpart.device_part_info = opinfo os.stat = real_stat def simple_device_part_info(devpath): # simple stupid return (/dev/vda, 1) for /dev/vda ret = re.search("([^0-9]*)([0-9]*)$", devpath) x = (ret.group(1), ret.group(2)) return x class Bunch(object): def __init__(self, **kwds): self.__dict__.update(kwds) # vi: ts=4 expandtab