summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py42
-rw-r--r--tests/unittests/test__init__.py8
-rw-r--r--tests/unittests/test_data.py46
-rw-r--r--tests/unittests/test_datasource/test_azure.py217
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py15
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py3
-rw-r--r--tests/unittests/test_datasource/test_gce.py54
-rw-r--r--tests/unittests/test_datasource/test_maas.py8
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py14
-rw-r--r--tests/unittests/test_datasource/test_openstack.py8
-rw-r--r--tests/unittests/test_datasource/test_smartos.py228
-rw-r--r--tests/unittests/test_ec2_util.py4
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py13
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py30
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py306
-rw-r--r--tests/unittests/test_pathprefix2dict.py10
-rw-r--r--tests/unittests/test_templating.py5
-rw-r--r--tests/unittests/test_util.py96
18 files changed, 909 insertions, 198 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 7516bd02..61a1f6ff 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,5 +1,6 @@
from __future__ import print_function
+import functools
import os
import sys
import shutil
@@ -25,9 +26,10 @@ PY2 = False
PY26 = False
PY27 = False
PY3 = False
+FIX_HTTPRETTY = False
_PY_VER = sys.version_info
-_PY_MAJOR, _PY_MINOR = _PY_VER[0:2]
+_PY_MAJOR, _PY_MINOR, _PY_MICRO = _PY_VER[0:3]
if (_PY_MAJOR, _PY_MINOR) <= (2, 6):
if (_PY_MAJOR, _PY_MINOR) == (2, 6):
PY26 = True
@@ -39,6 +41,8 @@ else:
PY2 = True
if (_PY_MAJOR, _PY_MINOR) >= (3, 0):
PY3 = True
+ if _PY_MINOR == 4 and _PY_MICRO < 3:
+ FIX_HTTPRETTY = True
if PY26:
# For now add these on, taken from python 2.7 + slightly adjusted. Drop
@@ -268,6 +272,37 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
mock.patch.object(sys, 'stderr', stderr))
+def import_httpretty():
+ """Import HTTPretty and monkey patch Python 3.4 issue.
+ See https://github.com/gabrielfalcao/HTTPretty/pull/193 and
+ as well as https://github.com/gabrielfalcao/HTTPretty/issues/221.
+
+ Lifted from
+ https://github.com/inveniosoftware/datacite/blob/master/tests/helpers.py
+ """
+ if not FIX_HTTPRETTY:
+ import httpretty
+ else:
+ import socket
+ old_SocketType = socket.SocketType
+
+ import httpretty
+ from httpretty import core
+
+ def sockettype_patch(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ f(*args, **kwargs)
+ socket.SocketType = old_SocketType
+ socket.__dict__['SocketType'] = old_SocketType
+ return inner
+
+ core.httpretty.disable = sockettype_patch(
+ httpretty.httpretty.disable
+ )
+ return httpretty
+
+
class HttprettyTestCase(TestCase):
# necessary as http_proxy gets in the way of httpretty
# https://github.com/gabrielfalcao/HTTPretty/issues/122
@@ -288,7 +323,10 @@ def populate_dir(path, files):
os.makedirs(path)
for (name, content) in files.items():
with open(os.path.join(path, name), "wb") as fp:
- fp.write(content.encode('utf-8'))
+ if isinstance(content, six.binary_type):
+ fp.write(content)
+ else:
+ fp.write(content.encode('utf-8'))
fp.close()
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 1a307e56..c32783a6 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -181,7 +181,7 @@ class TestCmdlineUrl(unittest.TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
- payload = "0"
+ payload = b"0"
cmdline = "ro %s=%s bar=1" % (key, url)
with mock.patch('cloudinit.url_helper.readurl',
@@ -194,13 +194,13 @@ class TestCmdlineUrl(unittest.TestCase):
def test_valid_content(self):
url = "http://example.com/foo"
key = "mykey"
- payload = "xcloud-config\nmydata: foo\nbar: wark\n"
+ payload = b"xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
with mock.patch('cloudinit.url_helper.readurl',
return_value=url_helper.StringResponse(payload)):
self.assertEqual(
- util.get_cmdline_url(names=[key], starts="xcloud-config",
+ util.get_cmdline_url(names=[key], starts=b"xcloud-config",
cmdline=cmdline),
(key, url, payload))
@@ -210,7 +210,7 @@ class TestCmdlineUrl(unittest.TestCase):
cmdline = "ro %s=%s bar=1" % (key, url)
with mock.patch('cloudinit.url_helper.readurl',
- return_value=url_helper.StringResponse('')):
+ return_value=url_helper.StringResponse(b'')):
self.assertEqual(
util.get_cmdline_url(names=["does-not-appear"],
starts="#cloud-config", cmdline=cmdline),
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 48475515..c603bfdb 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -13,6 +13,7 @@ except ImportError:
from six import BytesIO, StringIO
+from email import encoders
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
@@ -58,7 +59,6 @@ def gzip_text(text):
return contents.getvalue()
-
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
@@ -493,6 +493,50 @@ c: 4
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
])
+ def test_mime_application_octet_stream(self):
+ """Mime type application/octet-stream is ignored but shows warning."""
+ ci = stages.Init()
+ message = MIMEBase("application", "octet-stream")
+ message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
+ encoders.encode_base64(message)
+ ci.datasource = FakeDataSource(message.as_string().encode())
+
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (application/octet-stream)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
+
+ def test_cloud_config_archive(self):
+ non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
+ data = [{'content': '#cloud-config\npassword: gocubs\n'},
+ {'content': '#cloud-config\nlocale: chicago\n'},
+ {'content': non_decodable}]
+ message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(message)
+
+ fs = {}
+
+ def fsstore(filename, content, mode=0o0644, omode="wb"):
+ fs[filename] = content
+
+ # consuming the user-data provided should write 'cloud_config' file
+ # which will have our yaml in it.
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ mockobj.side_effect = fsstore
+ ci.fetch()
+ ci.consume_data()
+
+ cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
+ self.assertEqual(cfg.get('password'), 'gocubs')
+ self.assertEqual(cfg.get('locale'), 'chicago')
+
class TestUDProcess(helpers.ResourceUsingTestCase):
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 8112c69b..7e789853 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -116,9 +116,6 @@ class TestAzureDataSource(TestCase):
data['iid_from_shared_cfg'] = path
return 'i-my-azure-id'
- def _apply_hostname_bounce(**kwargs):
- data['apply_hostname_bounce'] = kwargs
-
if data.get('ovfcontent') is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
@@ -132,7 +129,9 @@ class TestAzureDataSource(TestCase):
(mod, 'wait_for_files', _wait_for_files),
(mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
(mod, 'iid_from_shared_config', _iid_from_shared_config),
- (mod, 'apply_hostname_bounce', _apply_hostname_bounce),
+ (mod, 'perform_hostname_bounce', mock.MagicMock()),
+ (mod, 'get_hostname', mock.MagicMock()),
+ (mod, 'set_hostname', mock.MagicMock()),
])
dsrc = mod.DataSourceAzureNet(
@@ -272,47 +271,6 @@ class TestAzureDataSource(TestCase):
for mypk in mypklist:
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
- def test_disabled_bounce(self):
- pass
-
- def test_apply_bounce_call_1(self):
- # hostname needs to get through to apply_hostname_bounce
- odata = {'HostName': 'my-random-hostname'}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- self._get_ds(data).get_data()
- self.assertIn('hostname', data['apply_hostname_bounce'])
- self.assertEqual(data['apply_hostname_bounce']['hostname'],
- odata['HostName'])
-
- def test_apply_bounce_call_configurable(self):
- # hostname_bounce should be configurable in datasource cfg
- cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
- 'command': 'my-bounce-command',
- 'hostname_command': 'my-hostname-command'}}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
-
- for k in cfg['hostname_bounce']:
- self.assertIn(k, data['apply_hostname_bounce'])
-
- for k, v in cfg['hostname_bounce'].items():
- self.assertEqual(data['apply_hostname_bounce'][k], v)
-
- def test_set_hostname_disabled(self):
- # config specifying set_hostname off should not bounce
- cfg = {'set_hostname': False}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
-
- self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
-
def test_default_ephemeral(self):
# make sure the ephemeral device works
odata = {}
@@ -425,6 +383,175 @@ class TestAzureDataSource(TestCase):
load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
+class TestAzureBounce(TestCase):
+
+ def mock_out_azure_moving_parts(self):
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'invoke_agent'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'wait_for_files'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'iid_from_shared_config',
+ mock.MagicMock(return_value='i-my-azure-id')))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
+ mock.MagicMock(return_value=[])))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'find_ephemeral_disk',
+ mock.MagicMock(return_value=None)))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'find_ephemeral_part',
+ mock.MagicMock(return_value=None)))
+
+ def setUp(self):
+ super(TestAzureBounce, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.addCleanup(shutil.rmtree, self.tmp)
+ DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.patches = ExitStack()
+ self.mock_out_azure_moving_parts()
+ self.get_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'get_hostname'))
+ self.set_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'set_hostname'))
+ self.subp = self.patches.enter_context(
+ mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
+
+ def tearDown(self):
+ self.patches.close()
+
+ def _get_ds(self, ovfcontent=None):
+ if ovfcontent is not None:
+ populate_dir(os.path.join(self.paths.seed_dir, "azure"),
+ {'ovf-env.xml': ovfcontent})
+ return DataSourceAzure.DataSourceAzureNet(
+ {}, distro=None, paths=self.paths)
+
+ def get_ovf_env_with_dscfg(self, hostname, cfg):
+ odata = {
+ 'HostName': hostname,
+ 'dscfg': {
+ 'text': b64e(yaml.dump(cfg)),
+ 'encoding': 'base64'
+ }
+ }
+ return construct_valid_ovf_env(data=odata)
+
+ def test_disabled_bounce_does_not_change_hostname(self):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_disabled_bounce_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_same_hostname_does_not_change_hostname(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_unchanged_hostname_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_force_performs_bounce_regardless(self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname(self):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(expected_hostname,
+ self.set_hostname.call_args_list[0][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_different_hostnames_performs_bounce(
+ self, perform_hostname_bounce):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname_back(self):
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_failure_in_bounce_still_resets_host_name(
+ self, perform_hostname_bounce):
+ perform_hostname_bounce.side_effect = Exception
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ def test_environment_correct_for_bounce_command(self):
+ interface = 'int0'
+ hostname = 'my-new-host'
+ old_hostname = 'my-old-host'
+ self.get_hostname.return_value = old_hostname
+ cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg(hostname, cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_env = self.subp.call_args[1]['env']
+ self.assertEqual(interface, bounce_env['interface'])
+ self.assertEqual(hostname, bounce_env['hostname'])
+ self.assertEqual(old_hostname, bounce_env['old_hostname'])
+
+ def test_default_bounce_command_used_by_default(self):
+ cmd = 'default-bounce-command'
+ DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_args = self.subp.call_args[1]['args']
+ self.assertEqual(cmd, bounce_args)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_set_hostname_option_can_disable_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_set_hostname_option_can_disable_hostname_set(self):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, self.set_hostname.call_count)
+
+
class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index e28bdd84..83aca505 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -2,6 +2,7 @@ from copy import copy
import json
import os
import shutil
+import six
import tempfile
try:
@@ -45,7 +46,7 @@ EC2_META = {
'reservation-id': 'r-iru5qm4m',
'security-groups': ['default']
}
-USER_DATA = '#!/bin/sh\necho This is user data\n'
+USER_DATA = b'#!/bin/sh\necho This is user data\n'
OSTACK_META = {
'availability_zone': 'nova',
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
@@ -56,8 +57,8 @@ OSTACK_META = {
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
-CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
CFG_DRIVE_FILES_V2 = {
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
@@ -346,8 +347,12 @@ def populate_dir(seed_dir, files):
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
- with open(path, "w") as fp:
+ if isinstance(content, six.text_type):
+ mode = "w"
+ else:
+ mode = "wb"
+
+ with open(path, mode) as fp:
fp.write(content)
- fp.close()
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index 98f9cfac..679d1b82 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import httpretty
import re
from six.moves.urllib_parse import urlparse
@@ -26,6 +25,8 @@ from cloudinit.sources import DataSourceDigitalOcean
from .. import helpers as test_helpers
+httpretty = test_helpers.import_httpretty()
+
# Abbreviated for the test
DO_INDEX = """id
hostname
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 6dd4b5ed..1fb100f7 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -15,7 +15,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import httpretty
import re
from base64 import b64encode, b64decode
@@ -27,12 +26,14 @@ from cloudinit.sources import DataSourceGCE
from .. import helpers as test_helpers
+httpretty = test_helpers.import_httpretty()
+
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
'instance/hostname': 'server.project-foo.local',
- 'instance/attributes/user-data': '/bin/echo foo\n',
+ 'instance/attributes/user-data': b'/bin/echo foo\n',
}
GCE_META_PARTIAL = {
@@ -112,10 +113,6 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(GCE_META.get('instance/attributes/user-data'),
self.ds.get_userdata_raw())
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
-
# test partial metadata (missing user-data in particular)
@httpretty.activate
def test_metadata_partial(self):
@@ -140,3 +137,48 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
decoded = b64decode(
GCE_META_ENCODING.get('instance/attributes/user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
+
+ @httpretty.activate
+ def test_missing_required_keys_return_false(self):
+ for required_key in ['instance/id', 'instance/zone',
+ 'instance/hostname']:
+ meta = GCE_META_PARTIAL.copy()
+ del meta[required_key]
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.assertEqual(False, self.ds.get_data())
+ httpretty.reset()
+
+ @httpretty.activate
+ def test_project_level_ssh_keys_are_used(self):
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback())
+ self.ds.get_data()
+
+ # we expect a list of public ssh keys with user names stripped
+ self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
+ self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_ssh_keys_are_used(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertIn(key_content, self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_keys_replace_project_level_keys(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertEqual([key_content], self.ds.get_public_ssh_keys())
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index d25e1adc..f109bb04 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -26,7 +26,7 @@ class TestMAASDataSource(TestCase):
data = {'instance-id': 'i-valid01',
'local-hostname': 'valid01-hostname',
- 'user-data': 'valid01-userdata',
+ 'user-data': b'valid01-userdata',
'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
my_d = os.path.join(self.tmp, "valid")
@@ -46,7 +46,7 @@ class TestMAASDataSource(TestCase):
data = {'instance-id': 'i-valid-extra',
'local-hostname': 'valid-extra-hostname',
- 'user-data': 'valid-extra-userdata', 'foo': 'bar'}
+ 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
@@ -103,7 +103,7 @@ class TestMAASDataSource(TestCase):
'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
- 'user-data': 'foodata',
+ 'user-data': b'foodata',
}
valid_order = [
'meta-data/local-hostname',
@@ -143,7 +143,7 @@ class TestMAASDataSource(TestCase):
userdata, metadata = DataSourceMAAS.read_maas_seed_url(
my_seed, header_cb=my_headers_cb, version=my_ver)
- self.assertEqual("foodata", userdata)
+ self.assertEqual(b"foodata", userdata)
self.assertEqual(metadata['instance-id'],
valid['meta-data/instance-id'])
self.assertEqual(metadata['local-hostname'],
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 4f967f58..85b4c25a 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -37,7 +37,7 @@ class TestNoCloudDataSource(TestCase):
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = "USER_DATA_HERE"
+ ud = b"USER_DATA_HERE"
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': ud, 'meta-data': yaml.safe_dump(md)})
@@ -92,20 +92,20 @@ class TestNoCloudDataSource(TestCase):
data = {
'fs_label': None,
'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
- 'user-data': "USER_DATA_RAW",
+ 'user-data': b"USER_DATA_RAW",
}
sys_cfg = {'datasource': {'NoCloud': data}}
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
+ self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
self.assertTrue(ret)
def test_nocloud_seed_with_vendordata(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = "USER_DATA_HERE"
- vd = "THIS IS MY VENDOR_DATA"
+ ud = b"USER_DATA_HERE"
+ vd = b"THIS IS MY VENDOR_DATA"
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': ud, 'meta-data': yaml.safe_dump(md),
@@ -126,7 +126,7 @@ class TestNoCloudDataSource(TestCase):
def test_nocloud_no_vendordata(self):
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
- {'user-data': "ud", 'meta-data': "instance-id: IID\n"})
+ {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
@@ -134,7 +134,7 @@ class TestNoCloudDataSource(TestCase):
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, "ud")
+ self.assertEqual(dsrc.userdata_raw, b"ud")
self.assertFalse(dsrc.vendordata)
self.assertTrue(ret)
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 81ef1546..0aa1ba84 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -31,7 +31,7 @@ from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-import httpretty as hp
+hp = test_helpers.import_httpretty()
BASE_URL = "http://169.254.169.254"
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -49,7 +49,7 @@ EC2_META = {
'public-ipv4': '0.0.0.1',
'reservation-id': 'r-iru5qm4m',
}
-USER_DATA = '#!/bin/sh\necho This is user data\n'
+USER_DATA = b'#!/bin/sh\necho This is user data\n'
VENDOR_DATA = {
'magic': '',
}
@@ -63,8 +63,8 @@ OSTACK_META = {
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c',
}
-CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
OS_FILES = {
'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
'openstack/latest/user_data': USER_DATA,
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 8b62b1b1..adee9019 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,18 +24,28 @@
from __future__ import print_function
-from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
-from cloudinit.util import b64e
-from .. import helpers
import os
import os.path
import re
import shutil
-import tempfile
import stat
+import tempfile
import uuid
+from binascii import crc32
+
+import serial
+import six
+from cloudinit import helpers as c_helpers
+from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
+
+from .. import helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -54,60 +64,15 @@ MOCK_RETURNS = {
DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
-class MockSerial(object):
- """Fake a serial terminal for testing the code that
- interfaces with the serial"""
-
- port = None
+def get_mock_client(mockdata):
+ class MockMetadataClient(object):
- def __init__(self, mockdata):
- self.last = None
- self.last = None
- self.new = True
- self.count = 0
- self.mocked_out = []
- self.mockdata = mockdata
+ def __init__(self, serial):
+ pass
- def open(self):
- return True
-
- def close(self):
- return True
-
- def isOpen(self):
- return True
-
- def write(self, line):
- line = line.replace('GET ', '')
- self.last = line.rstrip()
-
- def readline(self):
- if self.new:
- self.new = False
- if self.last in self.mockdata:
- return 'SUCCESS\n'
- else:
- return 'NOTFOUND %s\n' % self.last
-
- if self.last in self.mockdata:
- if not self.mocked_out:
- self.mocked_out = [x for x in self._format_out()]
-
- if len(self.mocked_out) > self.count:
- self.count += 1
- return self.mocked_out[self.count - 1]
-
- def _format_out(self):
- if self.last in self.mockdata:
- _mret = self.mockdata[self.last]
- try:
- for l in _mret.splitlines():
- yield "%s\n" % l.rstrip()
- except:
- yield "%s\n" % _mret.rstrip()
-
- yield '.'
- yield '\n'
+ def get_metadata(self, metadata_key):
+ return mockdata.get(metadata_key)
+ return MockMetadataClient
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
@@ -155,9 +120,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if dmi_data is None:
dmi_data = DMI_DATA_RETURN
- def _get_serial(*_):
- return MockSerial(mockdata)
-
def _dmi_data():
return dmi_data
@@ -174,7 +136,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'get_serial', mock.MagicMock())])
+ self.apply_patches([
+ (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
self.apply_patches([(mod, 'device_exists', lambda d: True)])
@@ -443,6 +407,18 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.assertEqual(dsrc.device_name_to_device('FOO'),
mydscfg['disk_aliases']['FOO'])
+ @mock.patch('cloudinit.sources.DataSourceSmartOS.JoyentMetadataClient')
+ @mock.patch('cloudinit.sources.DataSourceSmartOS.get_serial')
+ def test_serial_console_closed_on_error(self, get_serial, metadata_client):
+ class OurException(Exception):
+ pass
+ metadata_client.side_effect = OurException
+ try:
+ DataSourceSmartOS.query_data('noun', 'device', 0)
+ except OurException:
+ pass
+ self.assertEqual(1, get_serial.return_value.close.call_count)
+
def apply_patches(patches):
ret = []
@@ -453,3 +429,133 @@ def apply_patches(patches):
setattr(ref, name, replace)
ret.append((ref, name, orig))
return ret
+
+
+class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestJoyentMetadataClient, self).setUp()
+ self.serial = mock.MagicMock(spec=serial.Serial)
+ self.request_id = 0xabcdef12
+ self.metadata_value = 'value'
+ self.response_parts = {
+ 'command': 'SUCCESS',
+ 'crc': 'b5a9ff00',
+ 'length': 17 + len(b64e(self.metadata_value)),
+ 'payload': b64e(self.metadata_value),
+ 'request_id': '{0:08x}'.format(self.request_id),
+ }
+
+ def make_response():
+ payload = ''
+ if self.response_parts['payload']:
+ payload = ' {0}'.format(self.response_parts['payload'])
+ del self.response_parts['payload']
+ return (
+ 'V2 {length} {crc} {request_id} {command}{payload}\n'.format(
+ payload=payload, **self.response_parts).encode('ascii'))
+ self.serial.readline.side_effect = make_response
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
+ mock.Mock(return_value=self.request_id)))
+
+ def _get_client(self):
+ return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+
+ def assertEndsWith(self, haystack, prefix):
+ self.assertTrue(haystack.endswith(prefix),
+ "{0} does not end with '{1}'".format(
+ repr(haystack), prefix))
+
+ def assertStartsWith(self, haystack, prefix):
+ self.assertTrue(haystack.startswith(prefix),
+ "{0} does not start with '{1}'".format(
+ repr(haystack), prefix))
+
+ def test_get_metadata_writes_a_single_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(1, self.serial.write.call_count)
+ written_line = self.serial.write.call_args[0][0]
+ self.assertEndsWith(written_line, b'\n')
+ self.assertEqual(1, written_line.count(b'\n'))
+
+ def _get_written_line(self, key='some_key'):
+ client = self._get_client()
+ client.get_metadata(key)
+ return self.serial.write.call_args[0][0]
+
+ def test_get_metadata_writes_bytes(self):
+ self.assertIsInstance(self._get_written_line(), six.binary_type)
+
+ def test_get_metadata_line_starts_with_v2(self):
+ self.assertStartsWith(self._get_written_line(), b'V2')
+
+ def test_get_metadata_uses_get_command(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ self.assertEqual('GET', parts[4])
+
+ def test_get_metadata_base64_encodes_argument(self):
+ key = 'my_key'
+ parts = self._get_written_line(key).decode('ascii').strip().split(' ')
+ self.assertEqual(b64e(key), parts[5])
+
+ def test_get_metadata_calculates_length_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_length = len(' '.join(parts[3:]))
+ self.assertEqual(expected_length, int(parts[1]))
+
+ def test_get_metadata_uses_appropriate_request_id(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ request_id = parts[3]
+ self.assertEqual(8, len(request_id))
+ self.assertEqual(request_id, request_id.lower())
+
+ def test_get_metadata_uses_random_number_for_request_id(self):
+ line = self._get_written_line()
+ request_id = line.decode('ascii').strip().split(' ')[3]
+ self.assertEqual('{0:08x}'.format(self.request_id), request_id)
+
+ def test_get_metadata_checksums_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_checksum = '{0:08x}'.format(
+ crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
+ checksum = parts[2]
+ self.assertEqual(expected_checksum, checksum)
+
+ def test_get_metadata_reads_a_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(1, self.serial.readline.call_count)
+
+ def test_get_metadata_returns_valid_value(self):
+ client = self._get_client()
+ value = client.get_metadata('some_key')
+ self.assertEqual(self.metadata_value, value)
+
+ def test_get_metadata_throws_exception_for_incorrect_length(self):
+ self.response_parts['length'] = 0
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_incorrect_crc(self):
+ self.response_parts['crc'] = 'deadbeef'
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_request_id_mismatch(self):
+ self.response_parts['request_id'] = 'deadbeef'
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_returns_None_if_value_not_found(self):
+ self.response_parts['payload'] = ''
+ self.response_parts['command'] = 'NOTFOUND'
+ self.response_parts['length'] = 17
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertIsNone(client.get_metadata('some_key'))
diff --git a/tests/unittests/test_ec2_util.py b/tests/unittests/test_ec2_util.py
index 84aa002e..99fc54be 100644
--- a/tests/unittests/test_ec2_util.py
+++ b/tests/unittests/test_ec2_util.py
@@ -3,7 +3,7 @@ from . import helpers
from cloudinit import ec2_utils as eu
from cloudinit import url_helper as uh
-import httpretty as hp
+hp = helpers.import_httpretty()
class TestEc2Util(helpers.HttprettyTestCase):
@@ -16,7 +16,7 @@ class TestEc2Util(helpers.HttprettyTestCase):
body='stuff',
status=200)
userdata = eu.get_instance_userdata(self.VERSION)
- self.assertEquals('stuff', userdata)
+ self.assertEquals('stuff', userdata.decode('utf-8'))
@hp.activate
def test_userdata_fetch_fail_not_found(self):
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
index d8fe9a4f..895728b3 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -7,7 +7,6 @@ import os
import re
import shutil
import tempfile
-import unittest
class TestAptProxyConfig(TestCase):
@@ -30,7 +29,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = util.load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_http_proxy_written(self):
@@ -40,7 +39,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = util.load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_all_proxy_written(self):
@@ -58,7 +57,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = util.load_tfile_or_url(self.pfile)
for ptype, pval in values.items():
self.assertTrue(self._search_apt_config(contents, ptype, pval))
@@ -74,7 +73,7 @@ class TestAptProxyConfig(TestCase):
cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = util.load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "foo"))
def test_config_written(self):
@@ -86,14 +85,14 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.cfile))
self.assertFalse(os.path.isfile(self.pfile))
- self.assertEqual(str(util.read_file_or_url(self.cfile)), payload)
+ self.assertEqual(util.load_tfile_or_url(self.cfile), payload)
def test_config_replaced(self):
util.write_file(self.pfile, "content doesnt matter")
cc_apt_configure.apply_apt_config({'apt_config': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.cfile))
- self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo")
+ self.assertEqual(util.load_tfile_or_url(self.cfile), "foo")
def test_config_deleted(self):
# if no 'apt_config' is provided, delete any previously written file
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
new file mode 100644
index 00000000..ddef8d48
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_disk_setup.py
@@ -0,0 +1,30 @@
+from cloudinit.config import cc_disk_setup
+from ..helpers import ExitStack, mock, TestCase
+
+
+class TestIsDiskUsed(TestCase):
+
+ def setUp(self):
+ super(TestIsDiskUsed, self).setUp()
+ self.patches = ExitStack()
+ mod_name = 'cloudinit.config.cc_disk_setup'
+ self.enumerate_disk = self.patches.enter_context(
+ mock.patch('{0}.enumerate_disk'.format(mod_name)))
+ self.check_fs = self.patches.enter_context(
+ mock.patch('{0}.check_fs'.format(mod_name)))
+
+ def test_multiple_child_nodes_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_valid_filesystem_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (
+ mock.MagicMock(), 'ext4', mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_one_child_nodes_and_no_fs_returns_false(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock()))
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
new file mode 100644
index 00000000..eceb14d9
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -0,0 +1,306 @@
+from cloudinit.config.cc_snappy import (
+ makeop, get_package_ops, render_snap_op)
+from cloudinit import util
+from .. import helpers as t_help
+
+import os
+import shutil
+import tempfile
+import yaml
+
+ALLOWED = (dict, list, int, str)
+
+
+class TestInstallPackages(t_help.TestCase):
+ def setUp(self):
+ super(TestInstallPackages, self).setUp()
+ self.unapply = []
+
+ # by default 'which' has nothing in its path
+ self.apply_patches([(util, 'subp', self._subp)])
+ self.subp_called = []
+ self.snapcmds = []
+ self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ shutil.rmtree(self.tmp)
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def populate_tmp(self, files):
+ return t_help.populate_dir(self.tmp, files)
+
+ def _subp(self, *args, **kwargs):
+ # supports subp calling with cmd as args or kwargs
+ if 'args' not in kwargs:
+ kwargs['args'] = args[0]
+ self.subp_called.append(kwargs)
+ args = kwargs['args']
+ # here we basically parse the snappy command invoked
+ # and append to snapcmds a list of (mode, pkg, config)
+ if args[0:2] == ['snappy', 'config']:
+ if args[3] == "-":
+ config = kwargs.get('data', '')
+ else:
+ with open(args[3], "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['config', args[2], config])
+ elif args[0:2] == ['snappy', 'install']:
+ config = None
+ pkg = None
+ for arg in args[2:]:
+ if arg.startswith("-"):
+ continue
+ if not pkg:
+ pkg = arg
+ elif not config:
+ cfgfile = arg
+ if cfgfile == "-":
+ config = kwargs.get('data', '')
+ elif cfgfile:
+ with open(cfgfile, "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['install', pkg, config])
+
+ def test_package_ops_1(self):
+ ret = get_package_ops(
+ packages=['pkg1', 'pkg2', 'pkg3'],
+ configs={'pkg2': b'mycfg2'}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg1', None, None),
+ makeop('install', 'pkg2', b'mycfg2', None),
+ makeop('install', 'pkg3', None, None)])
+
+ def test_package_ops_config_only(self):
+ ret = get_package_ops(
+ packages=None,
+ configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
+ self.assertEqual(
+ ret, [makeop('config', 'pkg2', b'mycfg2')])
+
+ def test_package_ops_install_and_config(self):
+ ret = get_package_ops(
+ packages=['pkg3', 'pkg2'],
+ configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
+ installed=['xinstalled'])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg3'),
+ makeop('install', 'pkg2', b'mycfg2'),
+ makeop('config', 'xinstalled', b'xcfg')])
+
+ def test_package_ops_install_long_config_short(self):
+ # a package can be installed by full name, but have config by short
+ cfg = {'k1': 'k2'}
+ ret = get_package_ops(
+ packages=['config-example.canonical'],
+ configs={'config-example': cfg}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'config-example.canonical', cfg)])
+
+ def test_package_ops_with_file(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
+ "snapf2.snap": b"foo2", "foo.bar": "ignored"})
+ ret = get_package_ops(
+ packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config"),
+ makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
+ makeop('install', 'pkg1')])
+
+ def test_package_ops_common_filename(self):
+ # fish package name from filename
+ # package names likely look like: pkgname.namespace_version_arch.snap
+
+ # find filenames
+ self.populate_tmp(
+ {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
+ "pkg-ws.config": "pkg-ws-config",
+ "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
+ "pkg1.smoser.config": "pkg1.smoser.config-data",
+ "pkg1.config": "pkg1.config-data",
+ "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
+ "pkg2.smoser_0.0_amd64.config": "pkg2.config",
+ })
+
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
+ path="pkg-ws.smoser_0.3.4_all.snap",
+ cfgfile="pkg-ws.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
+ path="pkg1.smoser_1.2.3_all.snap",
+ cfgfile="pkg1.smoser.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
+ path="pkg2.smoser_0.0_amd64.snap",
+ cfgfile="pkg2.smoser_0.0_amd64.config"),
+ ])
+
+ def test_package_ops_config_overrides_file(self):
+ # config data overrides local file .config
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
+ ret = get_package_ops(
+ packages=[], configs={'snapf1': 'snapf1cfg-config'},
+ installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path="snapf1.snap", config="snapf1cfg-config")])
+
+ def test_package_ops_namespacing(self):
+ cfgs = {
+ 'config-example': {'k1': 'v1'},
+ 'pkg1': {'p1': 'p2'},
+ 'ubuntu-core': {'c1': 'c2'},
+ 'notinstalled.smoser': {'s1': 's2'},
+ }
+ ret = get_package_ops(
+ packages=['config-example.canonical'], configs=cfgs,
+ installed=['config-example.smoser', 'pkg1.canonical',
+ 'ubuntu-core'])
+
+ expected_configs = [
+ makeop('config', 'pkg1', config=cfgs['pkg1']),
+ makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
+ expected_installs = [
+ makeop('install', 'config-example.canonical',
+ config=cfgs['config-example'])]
+
+ installs = [i for i in ret if i['op'] == 'install']
+ configs = [c for c in ret if c['op'] == 'config']
+
+ self.assertEqual(installs, expected_installs)
+ # configs are not ordered
+ self.assertEqual(len(configs), len(expected_configs))
+ self.assertTrue(all(found in expected_configs for found in configs))
+
+ def test_render_op_localsnap(self):
+ self.populate_tmp({"snapf1.snap": b"foo1"})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], None]])
+
+ def test_render_op_localsnap_localconfig(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap', cfgfile='snapf1.config')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], 'snapf1cfg']])
+
+ def test_render_op_snap(self):
+ op = makeop('install', 'snapf1')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', 'snapf1', None]])
+
+ def test_render_op_snap_config(self):
+ mycfg = {'key1': 'value1'}
+ name = "snapf1"
+ op = makeop('install', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', name, {'config': {name: mycfg}}]])
+
+ def test_render_op_config_bytes(self):
+ name = "snapf1"
+ mycfg = b'myconfig'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_string(self):
+ name = 'snapf1'
+ mycfg = 'myconfig: foo\nhisconfig: bar\n'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_dict(self):
+ # config entry for package can be a dict, not a string blob
+ mycfg = {'foo': 'bar'}
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ # snapcmds is a list of 3-entry lists. data_found will be the
+ # blob of data in the file in 'snappy install --config=<file>'
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_list(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
+ name = "snapf1"
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_int(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = 1
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_long_configs_short(self):
+ # install a namespaced package should have un-namespaced config
+ mycfg = {'k1': 'k2'}
+ name = 'snapf1'
+ op = makeop('install', name + ".smoser", config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_does_not_pad_cfgfile(self):
+ # package_ops with cfgfile should not modify --file= content.
+ mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config")])
+
+ # now the op was ok, but test that render didn't mess it up.
+ render_snap_op(**ret[0])
+ data_found = self.snapcmds[0][2]
+ # the data found gets loaded in the snapcmd interpretation
+ # so this comparison is a bit lossy, but input to snappy config
+ # is expected to be yaml loadable, so it should be OK.
+ self.assertEqual(yaml.safe_load(mydata), data_found)
+
+
+def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
+ if cfgfile:
+ cfgfile = os.path.sep.join([tmpd, cfgfile])
+ if path:
+ path = os.path.sep.join([tmpd, path])
+ return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_pathprefix2dict.py b/tests/unittests/test_pathprefix2dict.py
index 7089bde6..38fd75b6 100644
--- a/tests/unittests/test_pathprefix2dict.py
+++ b/tests/unittests/test_pathprefix2dict.py
@@ -14,28 +14,28 @@ class TestPathPrefix2Dict(TestCase):
self.addCleanup(shutil.rmtree, self.tmp)
def test_required_only(self):
- dirdata = {'f1': 'f1content', 'f2': 'f2content'}
+ dirdata = {'f1': b'f1content', 'f2': b'f2content'}
populate_dir(self.tmp, dirdata)
ret = util.pathprefix2dict(self.tmp, required=['f1', 'f2'])
self.assertEqual(dirdata, ret)
def test_required_missing(self):
- dirdata = {'f1': 'f1content'}
+ dirdata = {'f1': b'f1content'}
populate_dir(self.tmp, dirdata)
kwargs = {'required': ['f1', 'f2']}
self.assertRaises(ValueError, util.pathprefix2dict, self.tmp, **kwargs)
def test_no_required_and_optional(self):
- dirdata = {'f1': 'f1c', 'f2': 'f2c'}
+ dirdata = {'f1': b'f1c', 'f2': b'f2c'}
populate_dir(self.tmp, dirdata)
ret = util.pathprefix2dict(self.tmp, required=None,
- optional=['f1', 'f2'])
+ optional=['f1', 'f2'])
self.assertEqual(dirdata, ret)
def test_required_and_optional(self):
- dirdata = {'f1': 'f1c', 'f2': 'f2c'}
+ dirdata = {'f1': b'f1c', 'f2': b'f2c'}
populate_dir(self.tmp, dirdata)
ret = util.pathprefix2dict(self.tmp, required=['f1'], optional=['f2'])
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index cf7c03b0..0c19a2c2 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -18,10 +18,6 @@
from __future__ import print_function
-import sys
-import six
-import unittest
-
from . import helpers as test_helpers
import textwrap
@@ -30,6 +26,7 @@ from cloudinit import templater
try:
import Cheetah
HAS_CHEETAH = True
+ Cheetah # make pyflakes happy, as Cheetah is not used here
except ImportError:
HAS_CHEETAH = False
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 33c191a9..1619b5d2 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -323,58 +323,67 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
class TestReadDMIData(helpers.FilesystemMockingTestCase):
- def _patchIn(self, root):
- self.patchOS(root)
- self.patchUtils(root)
+ def setUp(self):
+ super(TestReadDMIData, self).setUp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
- def _write_key(self, key, content):
- """Mocks the sys path found on Linux systems."""
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
+ def _create_sysfs_parent_directory(self):
util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
+ def _create_sysfs_file(self, key, content):
+ """Mocks the sys path found on Linux systems."""
+ self._create_sysfs_parent_directory()
dmi_key = "/sys/class/dmi/id/{0}".format(key)
util.write_file(dmi_key, content)
- def _no_syspath(self, key, content):
+ def _configure_dmidecode_return(self, key, content, error=None):
"""
In order to test a missing sys path and call outs to dmidecode, this
function fakes the results of dmidecode to test the results.
"""
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
- self.real_which = util.which
- self.real_subp = util.subp
-
- def _which(key):
- return True
- util.which = _which
-
- def _cdd(_key, error=None):
+ def _dmidecode_subp(cmd):
+ if cmd[-1] != key:
+ raise util.ProcessExecutionError()
return (content, error)
- util.subp = _cdd
-
- def test_key(self):
- key_content = "TEST-KEY-DATA"
- self._write_key("key", key_content)
- self.assertEquals(key_content, util.read_dmi_data("key"))
- def test_key_mismatch(self):
- self._write_key("test", "ABC")
- self.assertNotEqual("123", util.read_dmi_data("test"))
-
- def test_no_key(self):
- self._no_syspath(None, None)
- self.assertFalse(util.read_dmi_data("key"))
-
- def test_callout_dmidecode(self):
- """test to make sure that dmidecode is used when no syspath"""
- self._no_syspath("key", "stuff")
- self.assertEquals("stuff", util.read_dmi_data("key"))
- self._no_syspath("key", None)
- self.assertFalse(None, util.read_dmi_data("key"))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'which', lambda _: True))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', _dmidecode_subp))
+
+ def patch_mapping(self, new_mapping):
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.util.DMIDECODE_TO_DMI_SYS_MAPPING',
+ new_mapping))
+
+ def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self):
+ self.patch_mapping({'mapped-key': 'mapped-value'})
+ expected_dmi_value = 'sys-used-correctly'
+ self._create_sysfs_file('mapped-value', expected_dmi_value)
+ self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong')
+ self.assertEqual(expected_dmi_value, util.read_dmi_data('mapped-key'))
+
+ def test_dmidecode_used_if_no_sysfs_file_on_disk(self):
+ self.patch_mapping({})
+ self._create_sysfs_parent_directory()
+ expected_dmi_value = 'dmidecode-used'
+ self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
+ self.assertEqual(expected_dmi_value,
+ util.read_dmi_data('use-dmidecode'))
+
+ def test_none_returned_if_neither_source_has_data(self):
+ self.patch_mapping({})
+ self._configure_dmidecode_return('key', 'value')
+ self.assertEqual(None, util.read_dmi_data('expect-fail'))
+
+ def test_none_returned_if_dmidecode_not_in_path(self):
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'which', lambda _: False))
+ self.patch_mapping({})
+ self.assertEqual(None, util.read_dmi_data('expect-fail'))
class TestMultiLog(helpers.FilesystemMockingTestCase):
@@ -443,4 +452,11 @@ class TestMultiLog(helpers.FilesystemMockingTestCase):
util.multi_log('message', log=log, log_level=log_level)
self.assertEqual((log_level, mock.ANY), log.log.call_args[0])
+
+class TestMessageFromString(helpers.TestCase):
+
+ def test_unicode_not_messed_up(self):
+ roundtripped = util.message_from_string(u'\n').as_string()
+ self.assertNotIn('\x00', roundtripped)
+
# vi: ts=4 expandtab