summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test_data.py32
-rw-r--r--tests/unittests/test_datasource/test_gce.py49
-rw-r--r--tests/unittests/test_datasource/test_smartos.py230
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py311
4 files changed, 555 insertions, 67 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 4f24e2dd..1b15dafa 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -494,10 +494,10 @@ c: 4
])
def test_mime_application_octet_stream(self):
- """Mime message of type application/octet-stream is ignored but shows warning."""
+ """Mime type application/octet-stream is ignored but shows warning."""
ci = stages.Init()
message = MIMEBase("application", "octet-stream")
- message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc\xbf')
+ message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
encoders.encode_base64(message)
ci.datasource = FakeDataSource(message.as_string().encode())
@@ -511,6 +511,34 @@ c: 4
mockobj.assert_called_once_with(
ci.paths.get_ipath("cloud_config"), "", 0o600)
+
+ def test_cloud_config_archive(self):
+ non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
+ data = [{'content': '#cloud-config\npassword: gocubs\n'},
+ {'content': '#cloud-config\nlocale: chicago\n'},
+ {'content': non_decodable}]
+ message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(message)
+
+ fs = {}
+
+ def fsstore(filename, content, mode=0o0644, omode="wb"):
+ fs[filename] = content
+
+ # consuming the user-data provided should write 'cloud_config' file
+ # which will have our yaml in it.
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ mockobj.side_effect = fsstore
+ ci.fetch()
+ ci.consume_data()
+
+ cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
+ self.assertEqual(cfg.get('password'), 'gocubs')
+ self.assertEqual(cfg.get('locale'), 'chicago')
+
+
class TestUDProcess(helpers.ResourceUsingTestCase):
def test_bytes_in_userdata(self):
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 4280abc4..1fb100f7 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -113,10 +113,6 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(GCE_META.get('instance/attributes/user-data'),
self.ds.get_userdata_raw())
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
-
# test partial metadata (missing user-data in particular)
@httpretty.activate
def test_metadata_partial(self):
@@ -141,3 +137,48 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
decoded = b64decode(
GCE_META_ENCODING.get('instance/attributes/user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
+
+ @httpretty.activate
+ def test_missing_required_keys_return_false(self):
+ for required_key in ['instance/id', 'instance/zone',
+ 'instance/hostname']:
+ meta = GCE_META_PARTIAL.copy()
+ del meta[required_key]
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.assertEqual(False, self.ds.get_data())
+ httpretty.reset()
+
+ @httpretty.activate
+ def test_project_level_ssh_keys_are_used(self):
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback())
+ self.ds.get_data()
+
+ # we expect a list of public ssh keys with user names stripped
+ self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
+ self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_ssh_keys_are_used(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertIn(key_content, self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_keys_replace_project_level_keys(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertEqual([key_content], self.ds.get_public_ssh_keys())
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 8b62b1b1..28b41eaf 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,18 +24,30 @@
from __future__ import print_function
-from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
-from cloudinit.util import b64e
-from .. import helpers
import os
import os.path
import re
import shutil
-import tempfile
import stat
+import tempfile
import uuid
+from binascii import crc32
+import serial
+import six
+
+import six
+
+from cloudinit import helpers as c_helpers
+from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
+
+from .. import helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -54,60 +66,15 @@ MOCK_RETURNS = {
DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
-class MockSerial(object):
- """Fake a serial terminal for testing the code that
- interfaces with the serial"""
-
- port = None
+def get_mock_client(mockdata):
+ class MockMetadataClient(object):
- def __init__(self, mockdata):
- self.last = None
- self.last = None
- self.new = True
- self.count = 0
- self.mocked_out = []
- self.mockdata = mockdata
+ def __init__(self, serial):
+ pass
- def open(self):
- return True
-
- def close(self):
- return True
-
- def isOpen(self):
- return True
-
- def write(self, line):
- line = line.replace('GET ', '')
- self.last = line.rstrip()
-
- def readline(self):
- if self.new:
- self.new = False
- if self.last in self.mockdata:
- return 'SUCCESS\n'
- else:
- return 'NOTFOUND %s\n' % self.last
-
- if self.last in self.mockdata:
- if not self.mocked_out:
- self.mocked_out = [x for x in self._format_out()]
-
- if len(self.mocked_out) > self.count:
- self.count += 1
- return self.mocked_out[self.count - 1]
-
- def _format_out(self):
- if self.last in self.mockdata:
- _mret = self.mockdata[self.last]
- try:
- for l in _mret.splitlines():
- yield "%s\n" % l.rstrip()
- except:
- yield "%s\n" % _mret.rstrip()
-
- yield '.'
- yield '\n'
+ def get_metadata(self, metadata_key):
+ return mockdata.get(metadata_key)
+ return MockMetadataClient
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
@@ -155,9 +122,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if dmi_data is None:
dmi_data = DMI_DATA_RETURN
- def _get_serial(*_):
- return MockSerial(mockdata)
-
def _dmi_data():
return dmi_data
@@ -174,7 +138,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'get_serial', mock.MagicMock())])
+ self.apply_patches([
+ (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
self.apply_patches([(mod, 'device_exists', lambda d: True)])
@@ -443,6 +409,18 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.assertEqual(dsrc.device_name_to_device('FOO'),
mydscfg['disk_aliases']['FOO'])
+ @mock.patch('cloudinit.sources.DataSourceSmartOS.JoyentMetadataClient')
+ @mock.patch('cloudinit.sources.DataSourceSmartOS.get_serial')
+ def test_serial_console_closed_on_error(self, get_serial, metadata_client):
+ class OurException(Exception):
+ pass
+ metadata_client.side_effect = OurException
+ try:
+ DataSourceSmartOS.query_data('noun', 'device', 0)
+ except OurException:
+ pass
+ self.assertEqual(1, get_serial.return_value.close.call_count)
+
def apply_patches(patches):
ret = []
@@ -453,3 +431,133 @@ def apply_patches(patches):
setattr(ref, name, replace)
ret.append((ref, name, orig))
return ret
+
+
+class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestJoyentMetadataClient, self).setUp()
+ self.serial = mock.MagicMock(spec=serial.Serial)
+ self.request_id = 0xabcdef12
+ self.metadata_value = 'value'
+ self.response_parts = {
+ 'command': 'SUCCESS',
+ 'crc': 'b5a9ff00',
+ 'length': 17 + len(b64e(self.metadata_value)),
+ 'payload': b64e(self.metadata_value),
+ 'request_id': '{0:08x}'.format(self.request_id),
+ }
+
+ def make_response():
+ payload = ''
+ if self.response_parts['payload']:
+ payload = ' {0}'.format(self.response_parts['payload'])
+ del self.response_parts['payload']
+ return (
+ 'V2 {length} {crc} {request_id} {command}{payload}\n'.format(
+ payload=payload, **self.response_parts).encode('ascii'))
+ self.serial.readline.side_effect = make_response
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
+ mock.Mock(return_value=self.request_id)))
+
+ def _get_client(self):
+ return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+
+ def assertEndsWith(self, haystack, prefix):
+ self.assertTrue(haystack.endswith(prefix),
+ "{0} does not end with '{1}'".format(
+ repr(haystack), prefix))
+
+ def assertStartsWith(self, haystack, prefix):
+ self.assertTrue(haystack.startswith(prefix),
+ "{0} does not start with '{1}'".format(
+ repr(haystack), prefix))
+
+ def test_get_metadata_writes_a_single_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(1, self.serial.write.call_count)
+ written_line = self.serial.write.call_args[0][0]
+ self.assertEndsWith(written_line, b'\n')
+ self.assertEqual(1, written_line.count(b'\n'))
+
+ def _get_written_line(self, key='some_key'):
+ client = self._get_client()
+ client.get_metadata(key)
+ return self.serial.write.call_args[0][0]
+
+ def test_get_metadata_writes_bytes(self):
+ self.assertIsInstance(self._get_written_line(), six.binary_type)
+
+ def test_get_metadata_line_starts_with_v2(self):
+ self.assertStartsWith(self._get_written_line(), b'V2')
+
+ def test_get_metadata_uses_get_command(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ self.assertEqual('GET', parts[4])
+
+ def test_get_metadata_base64_encodes_argument(self):
+ key = 'my_key'
+ parts = self._get_written_line(key).decode('ascii').strip().split(' ')
+ self.assertEqual(b64e(key), parts[5])
+
+ def test_get_metadata_calculates_length_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_length = len(' '.join(parts[3:]))
+ self.assertEqual(expected_length, int(parts[1]))
+
+ def test_get_metadata_uses_appropriate_request_id(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ request_id = parts[3]
+ self.assertEqual(8, len(request_id))
+ self.assertEqual(request_id, request_id.lower())
+
+ def test_get_metadata_uses_random_number_for_request_id(self):
+ line = self._get_written_line()
+ request_id = line.decode('ascii').strip().split(' ')[3]
+ self.assertEqual('{0:08x}'.format(self.request_id), request_id)
+
+ def test_get_metadata_checksums_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_checksum = '{0:08x}'.format(
+ crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
+ checksum = parts[2]
+ self.assertEqual(expected_checksum, checksum)
+
+ def test_get_metadata_reads_a_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(1, self.serial.readline.call_count)
+
+ def test_get_metadata_returns_valid_value(self):
+ client = self._get_client()
+ value = client.get_metadata('some_key')
+ self.assertEqual(self.metadata_value, value)
+
+ def test_get_metadata_throws_exception_for_incorrect_length(self):
+ self.response_parts['length'] = 0
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_incorrect_crc(self):
+ self.response_parts['crc'] = 'deadbeef'
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_request_id_mismatch(self):
+ self.response_parts['request_id'] = 'deadbeef'
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_returns_None_if_value_not_found(self):
+ self.response_parts['payload'] = ''
+ self.response_parts['command'] = 'NOTFOUND'
+ self.response_parts['length'] = 17
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertIsNone(client.get_metadata('some_key'))
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
new file mode 100644
index 00000000..f3109bac
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -0,0 +1,311 @@
+from cloudinit.config.cc_snappy import (
+ makeop, get_package_ops, render_snap_op)
+from cloudinit import util
+from .. import helpers as t_help
+
+import os
+import shutil
+import tempfile
+import yaml
+
+ALLOWED = (dict, list, int, str)
+
+
+class TestInstallPackages(t_help.TestCase):
+ def setUp(self):
+ super(TestInstallPackages, self).setUp()
+ self.unapply = []
+
+ # by default 'which' has nothing in its path
+ self.apply_patches([(util, 'subp', self._subp)])
+ self.subp_called = []
+ self.snapcmds = []
+ self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ shutil.rmtree(self.tmp)
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def populate_tmp(self, files):
+ return t_help.populate_dir(self.tmp, files)
+
+ def _subp(self, *args, **kwargs):
+ # supports subp calling with cmd as args or kwargs
+ if 'args' not in kwargs:
+ kwargs['args'] = args[0]
+ self.subp_called.append(kwargs)
+ snap_cmds = []
+ args = kwargs['args']
+ # here we basically parse the snappy command invoked
+ # and append to snapcmds a list of (mode, pkg, config)
+ if args[0:2] == ['snappy', 'config']:
+ if args[3] == "-":
+ config = kwargs.get('data', '')
+ else:
+ with open(args[3], "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['config', args[2], config])
+ elif args[0:2] == ['snappy', 'install']:
+ config = None
+ pkg = None
+ for arg in args[2:]:
+ if arg.startswith("-"):
+ continue
+ if not pkg:
+ pkg = arg
+ elif not config:
+ cfgfile = arg
+ if cfgfile == "-":
+ config = kwargs.get('data', '')
+ elif cfgfile:
+ with open(cfgfile, "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['install', pkg, config])
+
+ def test_package_ops_1(self):
+ ret = get_package_ops(
+ packages=['pkg1', 'pkg2', 'pkg3'],
+ configs={'pkg2': b'mycfg2'}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg1', None, None),
+ makeop('install', 'pkg2', b'mycfg2', None),
+ makeop('install', 'pkg3', None, None)])
+
+ def test_package_ops_config_only(self):
+ ret = get_package_ops(
+ packages=None,
+ configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
+ self.assertEqual(
+ ret, [makeop('config', 'pkg2', b'mycfg2')])
+
+ def test_package_ops_install_and_config(self):
+ ret = get_package_ops(
+ packages=['pkg3', 'pkg2'],
+ configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
+ installed=['xinstalled'])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg3'),
+ makeop('install', 'pkg2', b'mycfg2'),
+ makeop('config', 'xinstalled', b'xcfg')])
+
+ def test_package_ops_install_long_config_short(self):
+ # a package can be installed by full name, but have config by short
+ cfg = {'k1': 'k2'}
+ ret = get_package_ops(
+ packages=['config-example.canonical'],
+ configs={'config-example': cfg}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'config-example.canonical', cfg)])
+
+ def test_package_ops_with_file(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
+ "snapf2.snap": b"foo2", "foo.bar": "ignored"})
+ ret = get_package_ops(
+ packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config"),
+ makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
+ makeop('install', 'pkg1')])
+
+ def test_package_ops_common_filename(self):
+ # fish package name from filename
+ # package names likely look like: pkgname.namespace_version_arch.snap
+ fname = "xkcd-webserver.canonical_0.3.4_all.snap"
+ name = "xkcd-webserver.canonical"
+ shortname = "xkcd-webserver"
+
+ # find filenames
+ self.populate_tmp(
+ {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
+ "pkg-ws.config": "pkg-ws-config",
+ "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
+ "pkg1.smoser.config": "pkg1.smoser.config-data",
+ "pkg1.config": "pkg1.config-data",
+ "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
+ "pkg2.smoser_0.0_amd64.config": "pkg2.config",
+ })
+
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
+ path="pkg-ws.smoser_0.3.4_all.snap",
+ cfgfile="pkg-ws.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
+ path="pkg1.smoser_1.2.3_all.snap",
+ cfgfile="pkg1.smoser.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
+ path="pkg2.smoser_0.0_amd64.snap",
+ cfgfile="pkg2.smoser_0.0_amd64.config"),
+ ])
+
+ def test_package_ops_config_overrides_file(self):
+ # config data overrides local file .config
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
+ ret = get_package_ops(
+ packages=[], configs={'snapf1': 'snapf1cfg-config'},
+ installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path="snapf1.snap", config="snapf1cfg-config")])
+
+ def test_package_ops_namespacing(self):
+ cfgs = {
+ 'config-example': {'k1': 'v1'},
+ 'pkg1': {'p1': 'p2'},
+ 'ubuntu-core': {'c1': 'c2'},
+ 'notinstalled.smoser': {'s1': 's2'},
+ }
+ cfg = {'config-example-k1': 'config-example-k2'}
+ ret = get_package_ops(
+ packages=['config-example.canonical'], configs=cfgs,
+ installed=['config-example.smoser', 'pkg1.canonical',
+ 'ubuntu-core'])
+
+ expected_configs = [
+ makeop('config', 'pkg1', config=cfgs['pkg1']),
+ makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
+ expected_installs = [
+ makeop('install', 'config-example.canonical',
+ config=cfgs['config-example'])]
+
+ installs = [i for i in ret if i['op'] == 'install']
+ configs = [c for c in ret if c['op'] == 'config']
+
+ self.assertEqual(installs, expected_installs)
+ # configs are not ordered
+ self.assertEqual(len(configs), len(expected_configs))
+ self.assertTrue(all(found in expected_configs for found in configs))
+
+ def test_render_op_localsnap(self):
+ self.populate_tmp({"snapf1.snap": b"foo1"})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], None]])
+
+ def test_render_op_localsnap_localconfig(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap', cfgfile='snapf1.config')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], 'snapf1cfg']])
+
+ def test_render_op_snap(self):
+ op = makeop('install', 'snapf1')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', 'snapf1', None]])
+
+ def test_render_op_snap_config(self):
+ mycfg = {'key1': 'value1'}
+ name = "snapf1"
+ op = makeop('install', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', name, {'config': {name: mycfg}}]])
+
+ def test_render_op_config_bytes(self):
+ name = "snapf1"
+ mycfg = b'myconfig'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_string(self):
+ name = 'snapf1'
+ mycfg = 'myconfig: foo\nhisconfig: bar\n'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_dict(self):
+ # config entry for package can be a dict, not a string blob
+ mycfg = {'foo': 'bar'}
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ # snapcmds is a list of 3-entry lists. data_found will be the
+ # blob of data in the file in 'snappy install --config=<file>'
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_list(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
+ name = "snapf1"
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_int(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = 1
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_long_configs_short(self):
+ # install a namespaced package should have un-namespaced config
+ mycfg = {'k1': 'k2'}
+ name = 'snapf1'
+ op = makeop('install', name + ".smoser", config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_does_not_pad_cfgfile(self):
+ # package_ops with cfgfile should not modify --file= content.
+ mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config")])
+
+ # now the op was ok, but test that render didn't mess it up.
+ render_snap_op(**ret[0])
+ data_found = self.snapcmds[0][2]
+ # the data found gets loaded in the snapcmd interpretation
+ # so this comparison is a bit lossy, but input to snappy config
+ # is expected to be yaml loadable, so it should be OK.
+ self.assertEqual(yaml.safe_load(mydata), data_found)
+
+
+def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
+ if cfgfile:
+ cfgfile = os.path.sep.join([tmpd, cfgfile])
+ if path:
+ path = os.path.sep.join([tmpd, path])
+ return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret