summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler/test_handler_growpart.py
blob: b7d5d7ba5c49c49828cf50adcf6f4fc94f022485 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit import cloud
from cloudinit.config import cc_growpart
from cloudinit import subp
from cloudinit import temp_utils

from cloudinit.tests.helpers import TestCase

import errno
import logging
import os
import shutil
import re
import unittest
from contextlib import ExitStack
from unittest import mock
import stat

# growpart:
#   mode: auto  # off, on, auto, 'growpart'
#   devices: ['root']

HELP_GROWPART_RESIZE = """
growpart disk partition
   rewrite partition table so that partition takes up all the space it can
   options:
    -h | --help       print Usage and exit
<SNIP>
    -u | --update  R  update the the kernel partition table info after growing
                      this requires kernel support and 'partx --update'
                      R is one of:
                       - 'auto'  : [default] update partition if possible
<SNIP>
   Example:
    - growpart /dev/sda 1
      Resize partition 1 on /dev/sda
"""

HELP_GROWPART_NO_RESIZE = """
growpart disk partition
   rewrite partition table so that partition takes up all the space it can
   options:
    -h | --help       print Usage and exit
<SNIP>
   Example:
    - growpart /dev/sda 1
      Resize partition 1 on /dev/sda
"""

HELP_GPART = """
usage: gpart add -t type [-a alignment] [-b start] <SNIP> geom
       gpart backup geom
       gpart bootcode [-b bootcode] [-p partcode -i index] [-f flags] geom
<SNIP>
       gpart resize -i index [-a alignment] [-s size] [-f flags] geom
       gpart restore [-lF] [-f flags] provider [...]
       gpart recover [-f flags] geom
       gpart help
<SNIP>
"""


class Dir:
    '''Stub object'''
    def __init__(self, name):
        self.name = name
        self.st_mode = name

    def is_dir(self, *args, **kwargs):
        return True

    def stat(self, *args, **kwargs):
        return self


class Scanner:
    '''Stub object'''
    def __enter__(self):
        return (Dir(''), Dir(''),)

    def __exit__(self, *args):
        pass


class TestDisabled(unittest.TestCase):
    def setUp(self):
        super(TestDisabled, self).setUp()
        self.name = "growpart"
        self.cloud_init = None
        self.log = logging.getLogger("TestDisabled")
        self.args = []

        self.handle = cc_growpart.handle

    def test_mode_off(self):
        # Test that nothing is done if mode is off.

        # this really only verifies that resizer_factory isn't called
        config = {'growpart': {'mode': 'off'}}

        with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj:
            self.handle(self.name, config, self.cloud_init, self.log,
                        self.args)
            self.assertEqual(mockobj.call_count, 0)


class TestConfig(TestCase):
    def setUp(self):
        super(TestConfig, self).setUp()
        self.name = "growpart"
        self.paths = None
        self.cloud = cloud.Cloud(None, self.paths, None, None, None)
        self.log = logging.getLogger("TestConfig")
        self.args = []

        self.cloud_init = None
        self.handle = cc_growpart.handle
        self.tmppath = '/tmp/cloudinit-test-file'
        self.tmpdir = os.scandir('/tmp')
        self.tmpfile = open(self.tmppath, 'w')

    def tearDown(self):
        self.tmpfile.close()
        os.remove(self.tmppath)

    @mock.patch.dict("os.environ", clear=True)
    def test_no_resizers_auto_is_fine(self):
        with mock.patch.object(
                subp, 'subp',
                return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:

            config = {'growpart': {'mode': 'auto'}}
            self.handle(self.name, config, self.cloud_init, self.log,
                        self.args)

            mockobj.assert_has_calls([
                mock.call(['growpart', '--help'], env={'LANG': 'C'}),
                mock.call(['gpart', 'help'], env={'LANG': 'C'}, rcs=[0, 1])])

    @mock.patch.dict("os.environ", clear=True)
    def test_no_resizers_mode_growpart_is_exception(self):
        with mock.patch.object(
                subp, 'subp',
                return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
            config = {'growpart': {'mode': "growpart"}}
            self.assertRaises(
                ValueError, self.handle, self.name, config,
                self.cloud_init, self.log, self.args)

            mockobj.assert_called_once_with(
                ['growpart', '--help'], env={'LANG': 'C'})

    @mock.patch.dict("os.environ", clear=True)
    def test_mode_auto_prefers_growpart(self):
        with mock.patch.object(
                subp, 'subp',
                return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:
            ret = cc_growpart.resizer_factory(mode="auto")
            self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)

            mockobj.assert_called_once_with(
                ['growpart', '--help'], env={'LANG': 'C'})

    @mock.patch.dict("os.environ", {'LANG': 'cs_CZ.UTF-8'}, clear=True)
    @mock.patch.object(temp_utils, 'mkdtemp', return_value='/tmp/much-random')
    @mock.patch.object(stat, 'S_ISDIR', return_value=False)
    @mock.patch.object(os.path, 'samestat', return_value=True)
    @mock.patch.object(os.path, "join", return_value='/tmp')
    @mock.patch.object(os, 'scandir', return_value=Scanner())
    @mock.patch.object(os, 'mkdir')
    @mock.patch.object(os, 'unlink')
    @mock.patch.object(os, 'rmdir')
    @mock.patch.object(os, 'open', return_value=1)
    @mock.patch.object(os, 'close')
    @mock.patch.object(shutil, 'rmtree')
    @mock.patch.object(os, 'lseek', return_value=1024)
    @mock.patch.object(os, 'lstat', return_value='interesting metadata')
    def test_force_lang_check_tempfile(self, *args, **kwargs):
        with mock.patch.object(
                subp,
                'subp',
                return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:

            ret = cc_growpart.resizer_factory(mode="auto")
            self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
            diskdev = '/dev/sdb'
            partnum = 1
            partdev = '/dev/sdb'
            ret.resize(diskdev, partnum, partdev)
        mockobj.assert_has_calls([
            mock.call(
                ["growpart", '--dry-run', diskdev, partnum],
                env={'LANG': 'C', 'TMPDIR': '/tmp'}),
            mock.call(
                ["growpart", diskdev, partnum],
                env={'LANG': 'C', 'TMPDIR': '/tmp'}),
        ])

    @mock.patch.dict("os.environ", {'LANG': 'cs_CZ.UTF-8'}, clear=True)
    def test_mode_auto_falls_back_to_gpart(self):
        with mock.patch.object(
                subp, 'subp',
                return_value=("", HELP_GPART)) as mockobj:
            ret = cc_growpart.resizer_factory(mode="auto")
            self.assertIsInstance(ret, cc_growpart.ResizeGpart)

            mockobj.assert_has_calls([
                mock.call(['growpart', '--help'], env={'LANG': 'C'}),
                mock.call(['gpart', 'help'], env={'LANG': 'C'}, rcs=[0, 1])])

    def test_handle_with_no_growpart_entry(self):
        # if no 'growpart' entry in config, then mode=auto should be used

        myresizer = object()
        retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),)

        with ExitStack() as mocks:
            factory = mocks.enter_context(
                mock.patch.object(cc_growpart, 'resizer_factory',
                                  return_value=myresizer))
            rsdevs = mocks.enter_context(
                mock.patch.object(cc_growpart, 'resize_devices',
                                  return_value=retval))
            mocks.enter_context(
                mock.patch.object(cc_growpart, 'RESIZERS',
                                  (('mysizer', object),)
                                  ))

            self.handle(self.name, {}, self.cloud_init, self.log, self.args)

            factory.assert_called_once_with('auto')
            rsdevs.assert_called_once_with(myresizer, ['/'])


class TestResize(unittest.TestCase):
    def setUp(self):
        super(TestResize, self).setUp()
        self.name = "growpart"
        self.log = logging.getLogger("TestResize")

    def test_simple_devices(self):
        # test simple device list
        # this patches out devent2dev, os.stat, and device_part_info
        # so in the end, doesn't test a lot
        devs = ["/dev/XXda1", "/dev/YYda2"]
        devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,
                            st_nlink=1, st_uid=0, st_gid=6, st_size=0,
                            st_atime=0, st_mtime=0, st_ctime=0)
        enoent = ["/dev/NOENT"]
        real_stat = os.stat
        resize_calls = []

        class myresizer(object):
            def resize(self, diskdev, partnum, partdev):
                resize_calls.append((diskdev, partnum, partdev))
                if partdev == "/dev/YYda2":
                    return (1024, 2048)
                return (1024, 1024)  # old size, new size

        def mystat(path):
            if path in devs:
                return devstat_ret
            if path in enoent:
                e = OSError("%s: does not exist" % path)
                e.errno = errno.ENOENT
                raise e
            return real_stat(path)

        try:
            opinfo = cc_growpart.device_part_info
            cc_growpart.device_part_info = simple_device_part_info
            os.stat = mystat

            resized = cc_growpart.resize_devices(myresizer(), devs + enoent)

            def find(name, res):
                for f in res:
                    if f[0] == name:
                        return f
                return None

            self.assertEqual(cc_growpart.RESIZE.NOCHANGE,
                             find("/dev/XXda1", resized)[1])
            self.assertEqual(cc_growpart.RESIZE.CHANGED,
                             find("/dev/YYda2", resized)[1])
            self.assertEqual(cc_growpart.RESIZE.SKIPPED,
                             find(enoent[0], resized)[1])
            # self.assertEqual(resize_calls,
            #                 [("/dev/XXda", "1", "/dev/XXda1"),
            #                  ("/dev/YYda", "2", "/dev/YYda2")])
        finally:
            cc_growpart.device_part_info = opinfo
            os.stat = real_stat


def simple_device_part_info(devpath):
    # simple stupid return (/dev/vda, 1) for /dev/vda
    ret = re.search("([^0-9]*)([0-9]*)$", devpath)
    x = (ret.group(1), ret.group(2))
    return x


class Bunch(object):
    def __init__(self, **kwds):
        self.__dict__.update(kwds)


# vi: ts=4 expandtab