summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler/test_handler_snappy.py
blob: 76b79c29a657277d4af9f5f88cd7460dcfb9223e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit.config.cc_snappy import (
    makeop, get_package_ops, render_snap_op)
from cloudinit.config.cc_snap_config import (
    add_assertions, add_snap_user, ASSERTIONS_FILE)
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.config.cc_snap_config import handle as snap_handle
from cloudinit.sources import DataSourceNone
from cloudinit.tests.helpers import FilesystemMockingTestCase, mock

from cloudinit.tests import helpers as t_help

import logging
import os
import shutil
import tempfile
import textwrap
import yaml

LOG = logging.getLogger(__name__)
ALLOWED = (dict, list, int, str)


class TestInstallPackages(t_help.TestCase):
    def setUp(self):
        super(TestInstallPackages, self).setUp()
        self.unapply = []

        # by default 'which' has nothing in its path
        self.apply_patches([(util, 'subp', self._subp)])
        self.subp_called = []
        self.snapcmds = []
        self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")

    def tearDown(self):
        apply_patches([i for i in reversed(self.unapply)])
        shutil.rmtree(self.tmp)

    def apply_patches(self, patches):
        ret = apply_patches(patches)
        self.unapply += ret

    def populate_tmp(self, files):
        return t_help.populate_dir(self.tmp, files)

    def _subp(self, *args, **kwargs):
        # supports subp calling with cmd as args or kwargs
        if 'args' not in kwargs:
            kwargs['args'] = args[0]
        self.subp_called.append(kwargs)
        args = kwargs['args']
        # here we basically parse the snappy command invoked
        # and append to snapcmds a list of (mode, pkg, config)
        if args[0:2] == ['snappy', 'config']:
            if args[3] == "-":
                config = kwargs.get('data', '')
            else:
                with open(args[3], "rb") as fp:
                    config = yaml.safe_load(fp.read())
            self.snapcmds.append(['config', args[2], config])
        elif args[0:2] == ['snappy', 'install']:
            config = None
            pkg = None
            for arg in args[2:]:
                if arg.startswith("-"):
                    continue
                if not pkg:
                    pkg = arg
                elif not config:
                    cfgfile = arg
                    if cfgfile == "-":
                        config = kwargs.get('data', '')
                    elif cfgfile:
                        with open(cfgfile, "rb") as fp:
                            config = yaml.safe_load(fp.read())
            self.snapcmds.append(['install', pkg, config])

    def test_package_ops_1(self):
        ret = get_package_ops(
            packages=['pkg1', 'pkg2', 'pkg3'],
            configs={'pkg2': b'mycfg2'}, installed=[])
        self.assertEqual(
            ret, [makeop('install', 'pkg1', None, None),
                  makeop('install', 'pkg2', b'mycfg2', None),
                  makeop('install', 'pkg3', None, None)])

    def test_package_ops_config_only(self):
        ret = get_package_ops(
            packages=None,
            configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
        self.assertEqual(
            ret, [makeop('config', 'pkg2', b'mycfg2')])

    def test_package_ops_install_and_config(self):
        ret = get_package_ops(
            packages=['pkg3', 'pkg2'],
            configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
            installed=['xinstalled'])
        self.assertEqual(
            ret, [makeop('install', 'pkg3'),
                  makeop('install', 'pkg2', b'mycfg2'),
                  makeop('config', 'xinstalled', b'xcfg')])

    def test_package_ops_install_long_config_short(self):
        # a package can be installed by full name, but have config by short
        cfg = {'k1': 'k2'}
        ret = get_package_ops(
            packages=['config-example.canonical'],
            configs={'config-example': cfg}, installed=[])
        self.assertEqual(
            ret, [makeop('install', 'config-example.canonical', cfg)])

    def test_package_ops_with_file(self):
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
             "snapf2.snap": b"foo2", "foo.bar": "ignored"})
        ret = get_package_ops(
            packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
                         cfgfile="snapf1.config"),
             makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
             makeop('install', 'pkg1')])

    def test_package_ops_common_filename(self):
        # fish package name from filename
        # package names likely look like: pkgname.namespace_version_arch.snap

        # find filenames
        self.populate_tmp(
            {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
             "pkg-ws.config": "pkg-ws-config",
             "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
             "pkg1.smoser.config": "pkg1.smoser.config-data",
             "pkg1.config": "pkg1.config-data",
             "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
             "pkg2.smoser_0.0_amd64.config": "pkg2.config"})

        ret = get_package_ops(
            packages=[], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
                         path="pkg-ws.smoser_0.3.4_all.snap",
                         cfgfile="pkg-ws.config"),
             makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
                         path="pkg1.smoser_1.2.3_all.snap",
                         cfgfile="pkg1.smoser.config"),
             makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
                         path="pkg2.smoser_0.0_amd64.snap",
                         cfgfile="pkg2.smoser_0.0_amd64.config"),
             ])

    def test_package_ops_config_overrides_file(self):
        # config data overrides local file .config
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
        ret = get_package_ops(
            packages=[], configs={'snapf1': 'snapf1cfg-config'},
            installed=[], fspath=self.tmp)
        self.assertEqual(
            ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
                              path="snapf1.snap", config="snapf1cfg-config")])

    def test_package_ops_namespacing(self):
        cfgs = {
            'config-example': {'k1': 'v1'},
            'pkg1': {'p1': 'p2'},
            'ubuntu-core': {'c1': 'c2'},
            'notinstalled.smoser': {'s1': 's2'},
        }
        ret = get_package_ops(
            packages=['config-example.canonical'], configs=cfgs,
            installed=['config-example.smoser', 'pkg1.canonical',
                       'ubuntu-core'])

        expected_configs = [
            makeop('config', 'pkg1', config=cfgs['pkg1']),
            makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
        expected_installs = [
            makeop('install', 'config-example.canonical',
                   config=cfgs['config-example'])]

        installs = [i for i in ret if i['op'] == 'install']
        configs = [c for c in ret if c['op'] == 'config']

        self.assertEqual(installs, expected_installs)
        # configs are not ordered
        self.assertEqual(len(configs), len(expected_configs))
        self.assertTrue(all(found in expected_configs for found in configs))

    def test_render_op_localsnap(self):
        self.populate_tmp({"snapf1.snap": b"foo1"})
        op = makeop_tmpd(self.tmp, 'install', 'snapf1',
                         path='snapf1.snap')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', op['path'], None]])

    def test_render_op_localsnap_localconfig(self):
        self.populate_tmp(
            {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
        op = makeop_tmpd(self.tmp, 'install', 'snapf1',
                         path='snapf1.snap', cfgfile='snapf1.config')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', op['path'], 'snapf1cfg']])

    def test_render_op_snap(self):
        op = makeop('install', 'snapf1')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', 'snapf1', None]])

    def test_render_op_snap_config(self):
        mycfg = {'key1': 'value1'}
        name = "snapf1"
        op = makeop('install', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', name, {'config': {name: mycfg}}]])

    def test_render_op_config_bytes(self):
        name = "snapf1"
        mycfg = b'myconfig'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])

    def test_render_op_config_string(self):
        name = 'snapf1'
        mycfg = 'myconfig: foo\nhisconfig: bar\n'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])

    def test_render_op_config_dict(self):
        # config entry for package can be a dict, not a string blob
        mycfg = {'foo': 'bar'}
        name = 'snapf1'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        # snapcmds is a list of 3-entry lists. data_found will be the
        # blob of data in the file in 'snappy install --config=<file>'
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_op_config_list(self):
        # config entry for package can be a list, not a string blob
        mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
        name = "snapf1"
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_op_config_int(self):
        # config entry for package can be a list, not a string blob
        mycfg = 1
        name = 'snapf1'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_long_configs_short(self):
        # install a namespaced package should have un-namespaced config
        mycfg = {'k1': 'k2'}
        name = 'snapf1'
        op = makeop('install', name + ".smoser", config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_does_not_pad_cfgfile(self):
        # package_ops with cfgfile should not modify --file= content.
        mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
        ret = get_package_ops(
            packages=[], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
                         cfgfile="snapf1.config")])

        # now the op was ok, but test that render didn't mess it up.
        render_snap_op(**ret[0])
        data_found = self.snapcmds[0][2]
        # the data found gets loaded in the snapcmd interpretation
        # so this comparison is a bit lossy, but input to snappy config
        # is expected to be yaml loadable, so it should be OK.
        self.assertEqual(yaml.safe_load(mydata), data_found)


class TestSnapConfig(FilesystemMockingTestCase):

    SYSTEM_USER_ASSERTION = textwrap.dedent("""
    type: system-user
    authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
    brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
    email: foo@bar.com
    password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
    series:
      - 16
    since: 2016-09-10T16:34:00+03:00
    until: 2017-11-10T16:34:00+03:00
    username: baz
    sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj

    AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
    Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
    zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
    s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
    +to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
    Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
    d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
    BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
    f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
    v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q==""")

    ACCOUNT_ASSERTION = textwrap.dedent("""
    type: account-key
    authority-id: canonical
    revision: 2
    public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
    account-id: canonical
    name: store
    since: 2016-04-01T00:00:00.0Z
    body-length: 717
    sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH

    AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
    qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
    vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
    UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
    Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
    o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
    VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
    2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
    Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
    vUvV7RjVzv17ut0AEQEAAQ==

    AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
    WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
    nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
    3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
    eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
    inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
    rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
    rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
    aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
    6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
    haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
    yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
    HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
    skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
    CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
    ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
    qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
    IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
    oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k""")

    test_assertions = [ACCOUNT_ASSERTION, SYSTEM_USER_ASSERTION]

    def setUp(self):
        super(TestSnapConfig, self).setUp()
        self.subp = util.subp
        self.new_root = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.new_root)

    def _get_cloud(self, distro, metadata=None):
        self.patchUtils(self.new_root)
        paths = helpers.Paths({})
        cls = distros.fetch(distro)
        mydist = cls(distro, {}, paths)
        myds = DataSourceNone.DataSourceNone({}, mydist, paths)
        if metadata:
            myds.metadata.update(metadata)
        return cloud.Cloud(myds, paths, {}, mydist, None)

    @mock.patch('cloudinit.util.write_file')
    @mock.patch('cloudinit.util.subp')
    def test_snap_config_add_assertions(self, msubp, mwrite):
        add_assertions(self.test_assertions)

        combined = "\n".join(self.test_assertions)
        mwrite.assert_any_call(ASSERTIONS_FILE, combined.encode('utf-8'))
        msubp.assert_called_with(['snap', 'ack', ASSERTIONS_FILE],
                                 capture=True)

    def test_snap_config_add_assertions_empty(self):
        self.assertRaises(ValueError, add_assertions, [])

    def test_add_assertions_nonlist(self):
        self.assertRaises(ValueError, add_assertions, {})

    @mock.patch('cloudinit.util.write_file')
    @mock.patch('cloudinit.util.subp')
    def test_snap_config_add_assertions_ack_fails(self, msubp, mwrite):
        msubp.side_effect = [util.ProcessExecutionError("Invalid assertion")]
        self.assertRaises(util.ProcessExecutionError, add_assertions,
                          self.test_assertions)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_no_config(self, mock_util, mock_add):
        cfg = {}
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        snap_handle('snap_config', cfg, cc, LOG, None)
        mock_add.assert_not_called()

    def test_snap_config_add_snap_user_no_config(self):
        usercfg = add_snap_user(cfg=None)
        self.assertIsNone(usercfg)

    def test_snap_config_add_snap_user_not_dict(self):
        cfg = ['foobar']
        self.assertRaises(ValueError, add_snap_user, cfg)

    def test_snap_config_add_snap_user_no_email(self):
        cfg = {'assertions': [], 'known': True}
        usercfg = add_snap_user(cfg=cfg)
        self.assertIsNone(usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_add_snap_user_email_only(self, mock_util):
        email = 'janet@planetjanet.org'
        cfg = {'email': email}
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
        ]

        usercfg = add_snap_user(cfg=cfg)

        self.assertEqual(usercfg, {'snapuser': email, 'known': False})

    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_add_snap_user_email_known(self, mock_util):
        email = 'janet@planetjanet.org'
        known = True
        cfg = {'email': email, 'known': known}
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
            (self.SYSTEM_USER_ASSERTION, ""),  # snap known system-user
        ]

        usercfg = add_snap_user(cfg=cfg)

        self.assertEqual(usercfg, {'snapuser': email, 'known': known})

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_system_not_snappy(self, mock_util, mock_add):
        cfg = {'snappy': {'assertions': self.test_assertions}}
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = False

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_not_called()

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser(self, mock_util, mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        usercfg = {'snapuser': email, 'known': False}
        cc.distro.create_user.assert_called_with(email, **usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known(self, mock_util, mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
            (self.SYSTEM_USER_ASSERTION, ""),  # snap known system-user
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        usercfg = {'snapuser': email, 'known': True}
        cc.distro.create_user.assert_called_with(email, **usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known_managed(self, mock_util,
                                                       mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("true\n", ""),  # snap managed
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        cc.distro.create_user.assert_not_called()

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known_no_assertion(self, mock_util,
                                                            mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': [self.ACCOUNT_ASSERTION],
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("true\n", ""),  # snap managed
            ("", ""),        # snap known system-user
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with([self.ACCOUNT_ASSERTION])
        cc.distro.create_user.assert_not_called()


def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
    if cfgfile:
        cfgfile = os.path.sep.join([tmpd, cfgfile])
    if path:
        path = os.path.sep.join([tmpd, path])
    return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))


def apply_patches(patches):
    ret = []
    for (ref, name, replace) in patches:
        if replace is None:
            continue
        orig = getattr(ref, name)
        setattr(ref, name, replace)
        ret.append((ref, name, orig))
    return ret

# vi: ts=4 expandtab