# This file is part of cloud-init. See LICENSE file for license information. from cloudinit.config.cc_snappy import ( makeop, get_package_ops, render_snap_op) from cloudinit.config.cc_snap_config import ( add_assertions, add_snap_user, ASSERTIONS_FILE) from cloudinit import (distros, helpers, cloud, util) from cloudinit.config.cc_snap_config import handle as snap_handle from cloudinit.sources import DataSourceNone from cloudinit.tests.helpers import FilesystemMockingTestCase, mock from cloudinit.tests import helpers as t_help import logging import os import shutil import tempfile import textwrap import yaml LOG = logging.getLogger(__name__) ALLOWED = (dict, list, int, str) class TestInstallPackages(t_help.TestCase): def setUp(self): super(TestInstallPackages, self).setUp() self.unapply = [] # by default 'which' has nothing in its path self.apply_patches([(util, 'subp', self._subp)]) self.subp_called = [] self.snapcmds = [] self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages") def tearDown(self): apply_patches([i for i in reversed(self.unapply)]) shutil.rmtree(self.tmp) def apply_patches(self, patches): ret = apply_patches(patches) self.unapply += ret def populate_tmp(self, files): return t_help.populate_dir(self.tmp, files) def _subp(self, *args, **kwargs): # supports subp calling with cmd as args or kwargs if 'args' not in kwargs: kwargs['args'] = args[0] self.subp_called.append(kwargs) args = kwargs['args'] # here we basically parse the snappy command invoked # and append to snapcmds a list of (mode, pkg, config) if args[0:2] == ['snappy', 'config']: if args[3] == "-": config = kwargs.get('data', '') else: with open(args[3], "rb") as fp: config = yaml.safe_load(fp.read()) self.snapcmds.append(['config', args[2], config]) elif args[0:2] == ['snappy', 'install']: config = None pkg = None for arg in args[2:]: if arg.startswith("-"): continue if not pkg: pkg = arg elif not config: cfgfile = arg if cfgfile == "-": config = kwargs.get('data', '') elif cfgfile: with open(cfgfile, "rb") as fp: config = yaml.safe_load(fp.read()) self.snapcmds.append(['install', pkg, config]) def test_package_ops_1(self): ret = get_package_ops( packages=['pkg1', 'pkg2', 'pkg3'], configs={'pkg2': b'mycfg2'}, installed=[]) self.assertEqual( ret, [makeop('install', 'pkg1', None, None), makeop('install', 'pkg2', b'mycfg2', None), makeop('install', 'pkg3', None, None)]) def test_package_ops_config_only(self): ret = get_package_ops( packages=None, configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2']) self.assertEqual( ret, [makeop('config', 'pkg2', b'mycfg2')]) def test_package_ops_install_and_config(self): ret = get_package_ops( packages=['pkg3', 'pkg2'], configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'}, installed=['xinstalled']) self.assertEqual( ret, [makeop('install', 'pkg3'), makeop('install', 'pkg2', b'mycfg2'), makeop('config', 'xinstalled', b'xcfg')]) def test_package_ops_install_long_config_short(self): # a package can be installed by full name, but have config by short cfg = {'k1': 'k2'} ret = get_package_ops( packages=['config-example.canonical'], configs={'config-example': cfg}, installed=[]) self.assertEqual( ret, [makeop('install', 'config-example.canonical', cfg)]) def test_package_ops_with_file(self): self.populate_tmp( {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg", "snapf2.snap": b"foo2", "foo.bar": "ignored"}) ret = get_package_ops( packages=['pkg1'], configs={}, installed=[], fspath=self.tmp) self.assertEqual( ret, [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", cfgfile="snapf1.config"), makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"), makeop('install', 'pkg1')]) def test_package_ops_common_filename(self): # fish package name from filename # package names likely look like: pkgname.namespace_version_arch.snap # find filenames self.populate_tmp( {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata", "pkg-ws.config": "pkg-ws-config", "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata", "pkg1.smoser.config": "pkg1.smoser.config-data", "pkg1.config": "pkg1.config-data", "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata", "pkg2.smoser_0.0_amd64.config": "pkg2.config"}) ret = get_package_ops( packages=[], configs={}, installed=[], fspath=self.tmp) self.assertEqual( ret, [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser', path="pkg-ws.smoser_0.3.4_all.snap", cfgfile="pkg-ws.config"), makeop_tmpd(self.tmp, 'install', 'pkg1.smoser', path="pkg1.smoser_1.2.3_all.snap", cfgfile="pkg1.smoser.config"), makeop_tmpd(self.tmp, 'install', 'pkg2.smoser', path="pkg2.smoser_0.0_amd64.snap", cfgfile="pkg2.smoser_0.0_amd64.config"), ]) def test_package_ops_config_overrides_file(self): # config data overrides local file .config self.populate_tmp( {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"}) ret = get_package_ops( packages=[], configs={'snapf1': 'snapf1cfg-config'}, installed=[], fspath=self.tmp) self.assertEqual( ret, [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", config="snapf1cfg-config")]) def test_package_ops_namespacing(self): cfgs = { 'config-example': {'k1': 'v1'}, 'pkg1': {'p1': 'p2'}, 'ubuntu-core': {'c1': 'c2'}, 'notinstalled.smoser': {'s1': 's2'}, } ret = get_package_ops( packages=['config-example.canonical'], configs=cfgs, installed=['config-example.smoser', 'pkg1.canonical', 'ubuntu-core']) expected_configs = [ makeop('config', 'pkg1', config=cfgs['pkg1']), makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])] expected_installs = [ makeop('install', 'config-example.canonical', config=cfgs['config-example'])] installs = [i for i in ret if i['op'] == 'install'] configs = [c for c in ret if c['op'] == 'config'] self.assertEqual(installs, expected_installs) # configs are not ordered self.assertEqual(len(configs), len(expected_configs)) self.assertTrue(all(found in expected_configs for found in configs)) def test_render_op_localsnap(self): self.populate_tmp({"snapf1.snap": b"foo1"}) op = makeop_tmpd(self.tmp, 'install', 'snapf1', path='snapf1.snap') render_snap_op(**op) self.assertEqual( self.snapcmds, [['install', op['path'], None]]) def test_render_op_localsnap_localconfig(self): self.populate_tmp( {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'}) op = makeop_tmpd(self.tmp, 'install', 'snapf1', path='snapf1.snap', cfgfile='snapf1.config') render_snap_op(**op) self.assertEqual( self.snapcmds, [['install', op['path'], 'snapf1cfg']]) def test_render_op_snap(self): op = makeop('install', 'snapf1') render_snap_op(**op) self.assertEqual( self.snapcmds, [['install', 'snapf1', None]]) def test_render_op_snap_config(self): mycfg = {'key1': 'value1'} name = "snapf1" op = makeop('install', name, config=mycfg) render_snap_op(**op) self.assertEqual( self.snapcmds, [['install', name, {'config': {name: mycfg}}]]) def test_render_op_config_bytes(self): name = "snapf1" mycfg = b'myconfig' op = makeop('config', name, config=mycfg) render_snap_op(**op) self.assertEqual( self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) def test_render_op_config_string(self): name = 'snapf1' mycfg = 'myconfig: foo\nhisconfig: bar\n' op = makeop('config', name, config=mycfg) render_snap_op(**op) self.assertEqual( self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) def test_render_op_config_dict(self): # config entry for package can be a dict, not a string blob mycfg = {'foo': 'bar'} name = 'snapf1' op = makeop('config', name, config=mycfg) render_snap_op(**op) # snapcmds is a list of 3-entry lists. data_found will be the # blob of data in the file in 'snappy install --config=' data_found = self.snapcmds[0][2] self.assertEqual(mycfg, data_found['config'][name]) def test_render_op_config_list(self): # config entry for package can be a list, not a string blob mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}] name = "snapf1" op = makeop('config', name, config=mycfg) render_snap_op(**op) data_found = self.snapcmds[0][2] self.assertEqual(mycfg, data_found['config'][name]) def test_render_op_config_int(self): # config entry for package can be a list, not a string blob mycfg = 1 name = 'snapf1' op = makeop('config', name, config=mycfg) render_snap_op(**op) data_found = self.snapcmds[0][2] self.assertEqual(mycfg, data_found['config'][name]) def test_render_long_configs_short(self): # install a namespaced package should have un-namespaced config mycfg = {'k1': 'k2'} name = 'snapf1' op = makeop('install', name + ".smoser", config=mycfg) render_snap_op(**op) data_found = self.snapcmds[0][2] self.assertEqual(mycfg, data_found['config'][name]) def test_render_does_not_pad_cfgfile(self): # package_ops with cfgfile should not modify --file= content. mydata = "foo1: bar1\nk: [l1, l2, l3]\n" self.populate_tmp( {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()}) ret = get_package_ops( packages=[], configs={}, installed=[], fspath=self.tmp) self.assertEqual( ret, [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", cfgfile="snapf1.config")]) # now the op was ok, but test that render didn't mess it up. render_snap_op(**ret[0]) data_found = self.snapcmds[0][2] # the data found gets loaded in the snapcmd interpretation # so this comparison is a bit lossy, but input to snappy config # is expected to be yaml loadable, so it should be OK. self.assertEqual(yaml.safe_load(mydata), data_found) class TestSnapConfig(FilesystemMockingTestCase): SYSTEM_USER_ASSERTION = textwrap.dedent(""" type: system-user authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp email: foo@bar.com password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt series: - 16 since: 2016-09-10T16:34:00+03:00 until: 2017-11-10T16:34:00+03:00 username: baz sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj +to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q==""") ACCOUNT_ASSERTION = textwrap.dedent(""" type: account-key authority-id: canonical revision: 2 public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0 account-id: canonical name: store since: 2016-04-01T00:00:00.0Z body-length: 717 sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482 vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9 2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc vUvV7RjVzv17ut0AEQEAAQ== AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL 3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1 rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+ rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ 6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9 HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k""") test_assertions = [ACCOUNT_ASSERTION, SYSTEM_USER_ASSERTION] def setUp(self): super(TestSnapConfig, self).setUp() self.subp = util.subp self.new_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.new_root) def _get_cloud(self, distro, metadata=None): self.patchUtils(self.new_root) paths = helpers.Paths({}) cls = distros.fetch(distro) mydist = cls(distro, {}, paths) myds = DataSourceNone.DataSourceNone({}, mydist, paths) if metadata: myds.metadata.update(metadata) return cloud.Cloud(myds, paths, {}, mydist, None) @mock.patch('cloudinit.util.write_file') @mock.patch('cloudinit.util.subp') def test_snap_config_add_assertions(self, msubp, mwrite): add_assertions(self.test_assertions) combined = "\n".join(self.test_assertions) mwrite.assert_any_call(ASSERTIONS_FILE, combined.encode('utf-8')) msubp.assert_called_with(['snap', 'ack', ASSERTIONS_FILE], capture=True) def test_snap_config_add_assertions_empty(self): self.assertRaises(ValueError, add_assertions, []) def test_add_assertions_nonlist(self): self.assertRaises(ValueError, add_assertions, {}) @mock.patch('cloudinit.util.write_file') @mock.patch('cloudinit.util.subp') def test_snap_config_add_assertions_ack_fails(self, msubp, mwrite): msubp.side_effect = [util.ProcessExecutionError("Invalid assertion")] self.assertRaises(util.ProcessExecutionError, add_assertions, self.test_assertions) @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_no_config(self, mock_util, mock_add): cfg = {} cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_not_called() def test_snap_config_add_snap_user_no_config(self): usercfg = add_snap_user(cfg=None) self.assertIsNone(usercfg) def test_snap_config_add_snap_user_not_dict(self): cfg = ['foobar'] self.assertRaises(ValueError, add_snap_user, cfg) def test_snap_config_add_snap_user_no_email(self): cfg = {'assertions': [], 'known': True} usercfg = add_snap_user(cfg=cfg) self.assertIsNone(usercfg) @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_add_snap_user_email_only(self, mock_util): email = 'janet@planetjanet.org' cfg = {'email': email} mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("false\n", ""), # snap managed ] usercfg = add_snap_user(cfg=cfg) self.assertEqual(usercfg, {'snapuser': email, 'known': False}) @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_add_snap_user_email_known(self, mock_util): email = 'janet@planetjanet.org' known = True cfg = {'email': email, 'known': known} mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("false\n", ""), # snap managed (self.SYSTEM_USER_ASSERTION, ""), # snap known system-user ] usercfg = add_snap_user(cfg=cfg) self.assertEqual(usercfg, {'snapuser': email, 'known': known}) @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_system_not_snappy(self, mock_util, mock_add): cfg = {'snappy': {'assertions': self.test_assertions}} cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None mock_util.system_is_snappy.return_value = False snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_not_called() @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_snapuser(self, mock_util, mock_add): email = 'janet@planetjanet.org' cfg = { 'snappy': { 'assertions': self.test_assertions, 'email': email, } } cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("false\n", ""), # snap managed ] snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_called_with(self.test_assertions) usercfg = {'snapuser': email, 'known': False} cc.distro.create_user.assert_called_with(email, **usercfg) @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_snapuser_known(self, mock_util, mock_add): email = 'janet@planetjanet.org' cfg = { 'snappy': { 'assertions': self.test_assertions, 'email': email, 'known': True, } } cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("false\n", ""), # snap managed (self.SYSTEM_USER_ASSERTION, ""), # snap known system-user ] snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_called_with(self.test_assertions) usercfg = {'snapuser': email, 'known': True} cc.distro.create_user.assert_called_with(email, **usercfg) @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_snapuser_known_managed(self, mock_util, mock_add): email = 'janet@planetjanet.org' cfg = { 'snappy': { 'assertions': self.test_assertions, 'email': email, 'known': True, } } cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("true\n", ""), # snap managed ] snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_called_with(self.test_assertions) cc.distro.create_user.assert_not_called() @mock.patch('cloudinit.config.cc_snap_config.add_assertions') @mock.patch('cloudinit.config.cc_snap_config.util') def test_snap_config_handle_snapuser_known_no_assertion(self, mock_util, mock_add): email = 'janet@planetjanet.org' cfg = { 'snappy': { 'assertions': [self.ACCOUNT_ASSERTION], 'email': email, 'known': True, } } cc = self._get_cloud('ubuntu') cc.distro = mock.MagicMock() cc.distro.name = 'ubuntu' mock_util.which.return_value = None mock_util.system_is_snappy.return_value = True mock_util.subp.side_effect = [ ("true\n", ""), # snap managed ("", ""), # snap known system-user ] snap_handle('snap_config', cfg, cc, LOG, None) mock_add.assert_called_with([self.ACCOUNT_ASSERTION]) cc.distro.create_user.assert_not_called() def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None): if cfgfile: cfgfile = os.path.sep.join([tmpd, cfgfile]) if path: path = os.path.sep.join([tmpd, path]) return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile)) def apply_patches(patches): ret = [] for (ref, name, replace) in patches: if replace is None: continue orig = getattr(ref, name) setattr(ref, name, replace) ret.append((ref, name, orig)) return ret # vi: ts=4 expandtab