summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test_cli.py54
-rw-r--r--tests/unittests/test_data.py31
-rw-r--r--tests/unittests/test_datasource/test_azure.py330
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py444
-rw-r--r--tests/unittests/test_datasource/test_gce.py49
-rw-r--r--tests/unittests/test_datasource/test_smartos.py2
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py17
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py30
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py5
-rw-r--r--tests/unittests/test_templating.py5
-rw-r--r--tests/unittests/test_util.py17
11 files changed, 892 insertions, 92 deletions
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
new file mode 100644
index 00000000..ed863399
--- /dev/null
+++ b/tests/unittests/test_cli.py
@@ -0,0 +1,54 @@
+import imp
+import os
+import sys
+import six
+
+from . import helpers as test_helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+BIN_CLOUDINIT = "bin/cloud-init"
+
+
+class TestCLI(test_helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestCLI, self).setUp()
+ self.stderr = six.StringIO()
+ self.patchStdoutAndStderr(stderr=self.stderr)
+ self.sys_exit = mock.MagicMock()
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, 'exit', self.sys_exit))
+
+ def _call_main(self):
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, 'argv', ['cloud-init']))
+ cli = imp.load_module(
+ 'cli', open(BIN_CLOUDINIT), '', ('', 'r', imp.PY_SOURCE))
+ try:
+ return cli.main()
+ except:
+ pass
+
+ @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
+ def test_no_arguments_shows_usage(self):
+ self._call_main()
+ self.assertIn('usage: cloud-init', self.stderr.getvalue())
+
+ @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
+ def test_no_arguments_exits_2(self):
+ exit_code = self._call_main()
+ if self.sys_exit.call_count:
+ self.assertEqual(mock.call(2), self.sys_exit.call_args)
+ else:
+ self.assertEqual(2, exit_code)
+
+ @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
+ def test_no_arguments_shows_error_message(self):
+ self._call_main()
+ self.assertIn('cloud-init: error: too few arguments',
+ self.stderr.getvalue())
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 4f24e2dd..c603bfdb 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -494,10 +494,10 @@ c: 4
])
def test_mime_application_octet_stream(self):
- """Mime message of type application/octet-stream is ignored but shows warning."""
+ """Mime type application/octet-stream is ignored but shows warning."""
ci = stages.Init()
message = MIMEBase("application", "octet-stream")
- message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc\xbf')
+ message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
encoders.encode_base64(message)
ci.datasource = FakeDataSource(message.as_string().encode())
@@ -511,6 +511,33 @@ c: 4
mockobj.assert_called_once_with(
ci.paths.get_ipath("cloud_config"), "", 0o600)
+ def test_cloud_config_archive(self):
+ non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
+ data = [{'content': '#cloud-config\npassword: gocubs\n'},
+ {'content': '#cloud-config\nlocale: chicago\n'},
+ {'content': non_decodable}]
+ message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(message)
+
+ fs = {}
+
+ def fsstore(filename, content, mode=0o0644, omode="wb"):
+ fs[filename] = content
+
+ # consuming the user-data provided should write 'cloud_config' file
+ # which will have our yaml in it.
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ mockobj.side_effect = fsstore
+ ci.fetch()
+ ci.consume_data()
+
+ cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
+ self.assertEqual(cfg.get('password'), 'gocubs')
+ self.assertEqual(cfg.get('locale'), 'chicago')
+
+
class TestUDProcess(helpers.ResourceUsingTestCase):
def test_bytes_in_userdata(self):
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 8112c69b..33b971f6 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -18,7 +18,7 @@ import stat
import yaml
import shutil
import tempfile
-import unittest
+import xml.etree.ElementTree as ET
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -116,9 +116,6 @@ class TestAzureDataSource(TestCase):
data['iid_from_shared_cfg'] = path
return 'i-my-azure-id'
- def _apply_hostname_bounce(**kwargs):
- data['apply_hostname_bounce'] = kwargs
-
if data.get('ovfcontent') is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
@@ -126,20 +123,61 @@ class TestAzureDataSource(TestCase):
mod = DataSourceAzure
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.get_metadata_from_fabric = mock.MagicMock(return_value={
+ 'instance-id': 'i-my-azure-id',
+ 'public-keys': [],
+ })
+
self.apply_patches([
(mod, 'list_possible_azure_ds_devs', dsdevs),
(mod, 'invoke_agent', _invoke_agent),
(mod, 'wait_for_files', _wait_for_files),
(mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
(mod, 'iid_from_shared_config', _iid_from_shared_config),
- (mod, 'apply_hostname_bounce', _apply_hostname_bounce),
- ])
+ (mod, 'perform_hostname_bounce', mock.MagicMock()),
+ (mod, 'get_hostname', mock.MagicMock()),
+ (mod, 'set_hostname', mock.MagicMock()),
+ (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
return dsrc
+ def xml_equals(self, oxml, nxml):
+ """Compare two sets of XML to make sure they are equal"""
+
+ def create_tag_index(xml):
+ et = ET.fromstring(xml)
+ ret = {}
+ for x in et.iter():
+ ret[x.tag] = x
+ return ret
+
+ def tags_exists(x, y):
+ for tag in x.keys():
+ self.assertIn(tag, y)
+ for tag in y.keys():
+ self.assertIn(tag, x)
+
+ def tags_equal(x, y):
+ for x_tag, x_val in x.items():
+ y_val = y.get(x_val.tag)
+ self.assertEquals(x_val.text, y_val.text)
+
+ old_cnt = create_tag_index(oxml)
+ new_cnt = create_tag_index(nxml)
+ tags_exists(old_cnt, new_cnt)
+ tags_equal(old_cnt, new_cnt)
+
+ def xml_notequals(self, oxml, nxml):
+ try:
+ self.xml_equals(oxml, nxml)
+ except AssertionError as e:
+ return
+ raise AssertionError("XML is the same")
+
def test_basic_seed_dir(self):
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
@@ -272,47 +310,6 @@ class TestAzureDataSource(TestCase):
for mypk in mypklist:
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
- def test_disabled_bounce(self):
- pass
-
- def test_apply_bounce_call_1(self):
- # hostname needs to get through to apply_hostname_bounce
- odata = {'HostName': 'my-random-hostname'}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- self._get_ds(data).get_data()
- self.assertIn('hostname', data['apply_hostname_bounce'])
- self.assertEqual(data['apply_hostname_bounce']['hostname'],
- odata['HostName'])
-
- def test_apply_bounce_call_configurable(self):
- # hostname_bounce should be configurable in datasource cfg
- cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
- 'command': 'my-bounce-command',
- 'hostname_command': 'my-hostname-command'}}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
-
- for k in cfg['hostname_bounce']:
- self.assertIn(k, data['apply_hostname_bounce'])
-
- for k, v in cfg['hostname_bounce'].items():
- self.assertEqual(data['apply_hostname_bounce'][k], v)
-
- def test_set_hostname_disabled(self):
- # config specifying set_hostname off should not bounce
- cfg = {'set_hostname': False}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
-
- self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
-
def test_default_ephemeral(self):
# make sure the ephemeral device works
odata = {}
@@ -359,6 +356,31 @@ class TestAzureDataSource(TestCase):
self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
+ def test_password_redacted_in_ovf(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserPassword': "mypass"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+
+ self.assertTrue(ret)
+ ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
+
+ # The XML should not be same since the user password is redacted
+ on_disk_ovf = load_file(ovf_env_path)
+ self.xml_notequals(data['ovfcontent'], on_disk_ovf)
+
+ # Make sure that the redacted password on disk is not used by CI
+ self.assertNotEquals(dsrc.cfg.get('password'),
+ DataSourceAzure.DEF_PASSWD_REDACTION)
+
+ # Make sure that the password was really encrypted
+ et = ET.fromstring(on_disk_ovf)
+ for elem in et.iter():
+ if 'UserPassword' in elem.tag:
+ self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION,
+ elem.text)
+
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
dsrc = self._get_ds({'ovfcontent': xml})
@@ -368,7 +390,7 @@ class TestAzureDataSource(TestCase):
# we expect that the ovf-env.xml file is copied there.
ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
self.assertTrue(os.path.exists(ovf_env_path))
- self.assertEqual(xml, load_file(ovf_env_path))
+ self.xml_equals(xml, load_file(ovf_env_path))
def test_ovf_can_include_unicode(self):
xml = construct_valid_ovf_env(data={})
@@ -417,12 +439,198 @@ class TestAzureDataSource(TestCase):
self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")
self.assertTrue(os.path.exists(
os.path.join(self.waagent_d, 'otherfile')))
- self.assertFalse(
- os.path.exists(os.path.join(self.waagent_d, 'SharedConfig.xml')))
- self.assertTrue(
- os.path.exists(os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertEqual(new_ovfenv,
- load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
+ self.assertFalse(os.path.exists(
+ os.path.join(self.waagent_d, 'SharedConfig.xml')))
+ self.assertTrue(os.path.exists(
+ os.path.join(self.waagent_d, 'ovf-env.xml')))
+ new_xml = load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))
+ self.xml_equals(new_ovfenv, new_xml)
+
+ def test_exception_fetching_fabric_data_doesnt_propagate(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ self.get_metadata_from_fabric.side_effect = Exception
+ self.assertFalse(ds.get_data())
+
+ def test_fabric_data_included_in_metadata(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ self.get_metadata_from_fabric.return_value = {'test': 'value'}
+ ret = ds.get_data()
+ self.assertTrue(ret)
+ self.assertEqual('value', ds.metadata['test'])
+
+
+class TestAzureBounce(TestCase):
+
+ def mock_out_azure_moving_parts(self):
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'invoke_agent'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'wait_for_files'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'iid_from_shared_config',
+ mock.MagicMock(return_value='i-my-azure-id')))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
+ mock.MagicMock(return_value=[])))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'find_ephemeral_disk',
+ mock.MagicMock(return_value=None)))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'find_ephemeral_part',
+ mock.MagicMock(return_value=None)))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric',
+ mock.MagicMock(return_value={})))
+
+ def setUp(self):
+ super(TestAzureBounce, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.addCleanup(shutil.rmtree, self.tmp)
+ DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.patches = ExitStack()
+ self.mock_out_azure_moving_parts()
+ self.get_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'get_hostname'))
+ self.set_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'set_hostname'))
+ self.subp = self.patches.enter_context(
+ mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
+
+ def tearDown(self):
+ self.patches.close()
+
+ def _get_ds(self, ovfcontent=None):
+ if ovfcontent is not None:
+ populate_dir(os.path.join(self.paths.seed_dir, "azure"),
+ {'ovf-env.xml': ovfcontent})
+ return DataSourceAzure.DataSourceAzureNet(
+ {}, distro=None, paths=self.paths)
+
+ def get_ovf_env_with_dscfg(self, hostname, cfg):
+ odata = {
+ 'HostName': hostname,
+ 'dscfg': {
+ 'text': b64e(yaml.dump(cfg)),
+ 'encoding': 'base64'
+ }
+ }
+ return construct_valid_ovf_env(data=odata)
+
+ def test_disabled_bounce_does_not_change_hostname(self):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_disabled_bounce_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_same_hostname_does_not_change_hostname(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_unchanged_hostname_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_force_performs_bounce_regardless(self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname(self):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(expected_hostname,
+ self.set_hostname.call_args_list[0][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_different_hostnames_performs_bounce(
+ self, perform_hostname_bounce):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname_back(self):
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_failure_in_bounce_still_resets_host_name(
+ self, perform_hostname_bounce):
+ perform_hostname_bounce.side_effect = Exception
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ def test_environment_correct_for_bounce_command(self):
+ interface = 'int0'
+ hostname = 'my-new-host'
+ old_hostname = 'my-old-host'
+ self.get_hostname.return_value = old_hostname
+ cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg(hostname, cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_env = self.subp.call_args[1]['env']
+ self.assertEqual(interface, bounce_env['interface'])
+ self.assertEqual(hostname, bounce_env['hostname'])
+ self.assertEqual(old_hostname, bounce_env['old_hostname'])
+
+ def test_default_bounce_command_used_by_default(self):
+ cmd = 'default-bounce-command'
+ DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_args = self.subp.call_args[1]['args']
+ self.assertEqual(cmd, bounce_args)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_set_hostname_option_can_disable_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_set_hostname_option_can_disable_hostname_set(self):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, self.set_hostname.call_count)
class TestReadAzureOvf(TestCase):
@@ -438,17 +646,3 @@ class TestReadAzureOvf(TestCase):
(_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
for mypk in mypklist:
self.assertIn(mypk, cfg['_pubkeys'])
-
-
-class TestReadAzureSharedConfig(unittest.TestCase):
- def test_valid_content(self):
- xml = """<?xml version="1.0" encoding="utf-8"?>
- <SharedConfig>
- <Deployment name="MY_INSTANCE_ID">
- <Service name="myservice"/>
- <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
- </Deployment>
- <Incarnation number="1"/>
- </SharedConfig>"""
- ret = DataSourceAzure.iid_from_shared_config_content(xml)
- self.assertEqual("MY_INSTANCE_ID", ret)
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
new file mode 100644
index 00000000..a5228870
--- /dev/null
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -0,0 +1,444 @@
+import os
+import struct
+import unittest
+
+from cloudinit.sources.helpers import azure as azure_helper
+from ..helpers import TestCase
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
+
+GOAL_STATE_TEMPLATE = """\
+<?xml version="1.0" encoding="utf-8"?>
+<GoalState xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="goalstate10.xsd">
+ <Version>2012-11-30</Version>
+ <Incarnation>{incarnation}</Incarnation>
+ <Machine>
+ <ExpectedState>Started</ExpectedState>
+ <StopRolesDeadlineHint>300000</StopRolesDeadlineHint>
+ <LBProbePorts>
+ <Port>16001</Port>
+ </LBProbePorts>
+ <ExpectHealthReport>FALSE</ExpectHealthReport>
+ </Machine>
+ <Container>
+ <ContainerId>{container_id}</ContainerId>
+ <RoleInstanceList>
+ <RoleInstance>
+ <InstanceId>{instance_id}</InstanceId>
+ <State>Started</State>
+ <Configuration>
+ <HostingEnvironmentConfig>
+ http://100.86.192.70:80/...hostingEnvironmentConfig...
+ </HostingEnvironmentConfig>
+ <SharedConfig>{shared_config_url}</SharedConfig>
+ <ExtensionsConfig>
+ http://100.86.192.70:80/...extensionsConfig...
+ </ExtensionsConfig>
+ <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig>
+ <Certificates>{certificates_url}</Certificates>
+ <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName>
+ </Configuration>
+ </RoleInstance>
+ </RoleInstanceList>
+ </Container>
+</GoalState>
+"""
+
+
+class TestReadAzureSharedConfig(unittest.TestCase):
+
+ def test_valid_content(self):
+ xml = """<?xml version="1.0" encoding="utf-8"?>
+ <SharedConfig>
+ <Deployment name="MY_INSTANCE_ID">
+ <Service name="myservice"/>
+ <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
+ </Deployment>
+ <Incarnation number="1"/>
+ </SharedConfig>"""
+ ret = azure_helper.iid_from_shared_config_content(xml)
+ self.assertEqual("MY_INSTANCE_ID", ret)
+
+
+class TestFindEndpoint(TestCase):
+
+ def setUp(self):
+ super(TestFindEndpoint, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.load_file = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'load_file'))
+
+ def test_missing_file(self):
+ self.load_file.side_effect = IOError
+ self.assertRaises(IOError,
+ azure_helper.WALinuxAgentShim.find_endpoint)
+
+ def test_missing_special_azure_line(self):
+ self.load_file.return_value = ''
+ self.assertRaises(Exception,
+ azure_helper.WALinuxAgentShim.find_endpoint)
+
+ def _build_lease_content(self, ip_address, use_hex=True):
+ ip_address_repr = ':'.join(
+ [hex(int(part)).replace('0x', '')
+ for part in ip_address.split('.')])
+ if not use_hex:
+ ip_address_repr = struct.pack(
+ '>L', int(ip_address_repr.replace(':', ''), 16))
+ ip_address_repr = '"{0}"'.format(ip_address_repr.decode('utf-8'))
+ return '\n'.join([
+ 'lease {',
+ ' interface "eth0";',
+ ' option unknown-245 {0};'.format(ip_address_repr),
+ '}'])
+
+ def test_hex_string(self):
+ ip_address = '98.76.54.32'
+ file_content = self._build_lease_content(ip_address)
+ self.load_file.return_value = file_content
+ self.assertEqual(ip_address,
+ azure_helper.WALinuxAgentShim.find_endpoint())
+
+ def test_hex_string_with_single_character_part(self):
+ ip_address = '4.3.2.1'
+ file_content = self._build_lease_content(ip_address)
+ self.load_file.return_value = file_content
+ self.assertEqual(ip_address,
+ azure_helper.WALinuxAgentShim.find_endpoint())
+
+ def test_packed_string(self):
+ ip_address = '98.76.54.32'
+ file_content = self._build_lease_content(ip_address, use_hex=False)
+ self.load_file.return_value = file_content
+ self.assertEqual(ip_address,
+ azure_helper.WALinuxAgentShim.find_endpoint())
+
+ def test_latest_lease_used(self):
+ ip_addresses = ['4.3.2.1', '98.76.54.32']
+ file_content = '\n'.join([self._build_lease_content(ip_address)
+ for ip_address in ip_addresses])
+ self.load_file.return_value = file_content
+ self.assertEqual(ip_addresses[-1],
+ azure_helper.WALinuxAgentShim.find_endpoint())
+
+
+class TestGoalStateParsing(TestCase):
+
+ default_parameters = {
+ 'incarnation': 1,
+ 'container_id': 'MyContainerId',
+ 'instance_id': 'MyInstanceId',
+ 'shared_config_url': 'MySharedConfigUrl',
+ 'certificates_url': 'MyCertificatesUrl',
+ }
+
+ def _get_goal_state(self, http_client=None, **kwargs):
+ if http_client is None:
+ http_client = mock.MagicMock()
+ parameters = self.default_parameters.copy()
+ parameters.update(kwargs)
+ xml = GOAL_STATE_TEMPLATE.format(**parameters)
+ if parameters['certificates_url'] is None:
+ new_xml_lines = []
+ for line in xml.splitlines():
+ if 'Certificates' in line:
+ continue
+ new_xml_lines.append(line)
+ xml = '\n'.join(new_xml_lines)
+ return azure_helper.GoalState(xml, http_client)
+
+ def test_incarnation_parsed_correctly(self):
+ incarnation = '123'
+ goal_state = self._get_goal_state(incarnation=incarnation)
+ self.assertEqual(incarnation, goal_state.incarnation)
+
+ def test_container_id_parsed_correctly(self):
+ container_id = 'TestContainerId'
+ goal_state = self._get_goal_state(container_id=container_id)
+ self.assertEqual(container_id, goal_state.container_id)
+
+ def test_instance_id_parsed_correctly(self):
+ instance_id = 'TestInstanceId'
+ goal_state = self._get_goal_state(instance_id=instance_id)
+ self.assertEqual(instance_id, goal_state.instance_id)
+
+ def test_shared_config_xml_parsed_and_fetched_correctly(self):
+ http_client = mock.MagicMock()
+ shared_config_url = 'TestSharedConfigUrl'
+ goal_state = self._get_goal_state(
+ http_client=http_client, shared_config_url=shared_config_url)
+ shared_config_xml = goal_state.shared_config_xml
+ self.assertEqual(1, http_client.get.call_count)
+ self.assertEqual(shared_config_url, http_client.get.call_args[0][0])
+ self.assertEqual(http_client.get.return_value.contents,
+ shared_config_xml)
+
+ def test_certificates_xml_parsed_and_fetched_correctly(self):
+ http_client = mock.MagicMock()
+ certificates_url = 'TestSharedConfigUrl'
+ goal_state = self._get_goal_state(
+ http_client=http_client, certificates_url=certificates_url)
+ certificates_xml = goal_state.certificates_xml
+ self.assertEqual(1, http_client.get.call_count)
+ self.assertEqual(certificates_url, http_client.get.call_args[0][0])
+ self.assertTrue(http_client.get.call_args[1].get('secure', False))
+ self.assertEqual(http_client.get.return_value.contents,
+ certificates_xml)
+
+ def test_missing_certificates_skips_http_get(self):
+ http_client = mock.MagicMock()
+ goal_state = self._get_goal_state(
+ http_client=http_client, certificates_url=None)
+ certificates_xml = goal_state.certificates_xml
+ self.assertEqual(0, http_client.get.call_count)
+ self.assertIsNone(certificates_xml)
+
+
+class TestAzureEndpointHttpClient(TestCase):
+
+ regular_headers = {
+ 'x-ms-agent-name': 'WALinuxAgent',
+ 'x-ms-version': '2012-11-30',
+ }
+
+ def setUp(self):
+ super(TestAzureEndpointHttpClient, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.read_file_or_url = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'read_file_or_url'))
+
+ def test_non_secure_get(self):
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ url = 'MyTestUrl'
+ response = client.get(url, secure=False)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(mock.call(url, headers=self.regular_headers),
+ self.read_file_or_url.call_args)
+
+ def test_secure_get(self):
+ url = 'MyTestUrl'
+ certificate = mock.MagicMock()
+ expected_headers = self.regular_headers.copy()
+ expected_headers.update({
+ "x-ms-cipher-name": "DES_EDE3_CBC",
+ "x-ms-guest-agent-public-x509-cert": certificate,
+ })
+ client = azure_helper.AzureEndpointHttpClient(certificate)
+ response = client.get(url, secure=True)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(mock.call(url, headers=expected_headers),
+ self.read_file_or_url.call_args)
+
+ def test_post(self):
+ data = mock.MagicMock()
+ url = 'MyTestUrl'
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ response = client.post(url, data=data)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(
+ mock.call(url, data=data, headers=self.regular_headers),
+ self.read_file_or_url.call_args)
+
+ def test_post_with_extra_headers(self):
+ url = 'MyTestUrl'
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ extra_headers = {'test': 'header'}
+ client.post(url, extra_headers=extra_headers)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ expected_headers = self.regular_headers.copy()
+ expected_headers.update(extra_headers)
+ self.assertEqual(
+ mock.call(mock.ANY, data=mock.ANY, headers=expected_headers),
+ self.read_file_or_url.call_args)
+
+
+class TestOpenSSLManager(TestCase):
+
+ def setUp(self):
+ super(TestOpenSSLManager, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.subp = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'subp'))
+ try:
+ self.open = patches.enter_context(
+ mock.patch('__builtin__.open'))
+ except ImportError:
+ self.open = patches.enter_context(
+ mock.patch('builtins.open'))
+
+ @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
+ @mock.patch.object(azure_helper.tempfile, 'mkdtemp')
+ def test_openssl_manager_creates_a_tmpdir(self, mkdtemp):
+ manager = azure_helper.OpenSSLManager()
+ self.assertEqual(mkdtemp.return_value, manager.tmpdir)
+
+ def test_generate_certificate_uses_tmpdir(self):
+ subp_directory = {}
+
+ def capture_directory(*args, **kwargs):
+ subp_directory['path'] = os.getcwd()
+
+ self.subp.side_effect = capture_directory
+ manager = azure_helper.OpenSSLManager()
+ self.assertEqual(manager.tmpdir, subp_directory['path'])
+
+ @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
+ @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock())
+ @mock.patch.object(azure_helper.util, 'del_dir')
+ def test_clean_up(self, del_dir):
+ manager = azure_helper.OpenSSLManager()
+ manager.clean_up()
+ self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
+
+
+class TestWALinuxAgentShim(TestCase):
+
+ def setUp(self):
+ super(TestWALinuxAgentShim, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.AzureEndpointHttpClient = patches.enter_context(
+ mock.patch.object(azure_helper, 'AzureEndpointHttpClient'))
+ self.find_endpoint = patches.enter_context(
+ mock.patch.object(
+ azure_helper.WALinuxAgentShim, 'find_endpoint'))
+ self.GoalState = patches.enter_context(
+ mock.patch.object(azure_helper, 'GoalState'))
+ self.iid_from_shared_config_content = patches.enter_context(
+ mock.patch.object(azure_helper, 'iid_from_shared_config_content'))
+ self.OpenSSLManager = patches.enter_context(
+ mock.patch.object(azure_helper, 'OpenSSLManager'))
+ patches.enter_context(
+ mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock()))
+
+ def test_http_client_uses_certificate(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ self.assertEqual(
+ [mock.call(self.OpenSSLManager.return_value.certificate)],
+ self.AzureEndpointHttpClient.call_args_list)
+
+ def test_correct_url_used_for_goalstate(self):
+ self.find_endpoint.return_value = 'test_endpoint'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ get = self.AzureEndpointHttpClient.return_value.get
+ self.assertEqual(
+ [mock.call('http://test_endpoint/machine/?comp=goalstate')],
+ get.call_args_list)
+ self.assertEqual(
+ [mock.call(get.return_value.contents,
+ self.AzureEndpointHttpClient.return_value)],
+ self.GoalState.call_args_list)
+
+ def test_certificates_used_to_determine_public_keys(self):
+ shim = azure_helper.WALinuxAgentShim()
+ data = shim.register_with_azure_and_fetch_data()
+ self.assertEqual(
+ [mock.call(self.GoalState.return_value.certificates_xml)],
+ self.OpenSSLManager.return_value.parse_certificates.call_args_list)
+ self.assertEqual(
+ self.OpenSSLManager.return_value.parse_certificates.return_value,
+ data['public-keys'])
+
+ def test_absent_certificates_produces_empty_public_keys(self):
+ self.GoalState.return_value.certificates_xml = None
+ shim = azure_helper.WALinuxAgentShim()
+ data = shim.register_with_azure_and_fetch_data()
+ self.assertEqual([], data['public-keys'])
+
+ def test_instance_id_returned_in_data(self):
+ shim = azure_helper.WALinuxAgentShim()
+ data = shim.register_with_azure_and_fetch_data()
+ self.assertEqual(
+ [mock.call(self.GoalState.return_value.shared_config_xml)],
+ self.iid_from_shared_config_content.call_args_list)
+ self.assertEqual(self.iid_from_shared_config_content.return_value,
+ data['instance-id'])
+
+ def test_correct_url_used_for_report_ready(self):
+ self.find_endpoint.return_value = 'test_endpoint'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ expected_url = 'http://test_endpoint/machine?comp=health'
+ self.assertEqual(
+ [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)],
+ self.AzureEndpointHttpClient.return_value.post.call_args_list)
+
+ def test_goal_state_values_used_for_report_ready(self):
+ self.GoalState.return_value.incarnation = 'TestIncarnation'
+ self.GoalState.return_value.container_id = 'TestContainerId'
+ self.GoalState.return_value.instance_id = 'TestInstanceId'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ posted_document = (
+ self.AzureEndpointHttpClient.return_value.post.call_args[1]['data']
+ )
+ self.assertIn('TestIncarnation', posted_document)
+ self.assertIn('TestContainerId', posted_document)
+ self.assertIn('TestInstanceId', posted_document)
+
+ def test_clean_up_can_be_called_at_any_time(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.clean_up()
+
+ def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ shim.clean_up()
+ self.assertEqual(
+ 1, self.OpenSSLManager.return_value.clean_up.call_count)
+
+ def test_failure_to_fetch_goalstate_bubbles_up(self):
+ class SentinelException(Exception):
+ pass
+ self.AzureEndpointHttpClient.return_value.get.side_effect = (
+ SentinelException)
+ shim = azure_helper.WALinuxAgentShim()
+ self.assertRaises(SentinelException,
+ shim.register_with_azure_and_fetch_data)
+
+
+class TestGetMetadataFromFabric(TestCase):
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_data_from_shim_returned(self, shim):
+ ret = azure_helper.get_metadata_from_fabric()
+ self.assertEqual(
+ shim.return_value.register_with_azure_and_fetch_data.return_value,
+ ret)
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_success_calls_clean_up(self, shim):
+ azure_helper.get_metadata_from_fabric()
+ self.assertEqual(1, shim.return_value.clean_up.call_count)
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_failure_in_registration_calls_clean_up(self, shim):
+ class SentinelException(Exception):
+ pass
+ shim.return_value.register_with_azure_and_fetch_data.side_effect = (
+ SentinelException)
+ self.assertRaises(SentinelException,
+ azure_helper.get_metadata_from_fabric)
+ self.assertEqual(1, shim.return_value.clean_up.call_count)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 4280abc4..1fb100f7 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -113,10 +113,6 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(GCE_META.get('instance/attributes/user-data'),
self.ds.get_userdata_raw())
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
-
# test partial metadata (missing user-data in particular)
@httpretty.activate
def test_metadata_partial(self):
@@ -141,3 +137,48 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
decoded = b64decode(
GCE_META_ENCODING.get('instance/attributes/user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
+
+ @httpretty.activate
+ def test_missing_required_keys_return_false(self):
+ for required_key in ['instance/id', 'instance/zone',
+ 'instance/hostname']:
+ meta = GCE_META_PARTIAL.copy()
+ del meta[required_key]
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.assertEqual(False, self.ds.get_data())
+ httpretty.reset()
+
+ @httpretty.activate
+ def test_project_level_ssh_keys_are_used(self):
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback())
+ self.ds.get_data()
+
+ # we expect a list of public ssh keys with user names stripped
+ self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
+ self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_ssh_keys_are_used(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertIn(key_content, self.ds.get_public_ssh_keys())
+
+ @httpretty.activate
+ def test_instance_level_keys_replace_project_level_keys(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ httpretty.register_uri(httpretty.GET, MD_URL_RE,
+ body=_new_request_callback(meta))
+ self.ds.get_data()
+
+ self.assertEqual([key_content], self.ds.get_public_ssh_keys())
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 28b41eaf..adee9019 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -36,8 +36,6 @@ from binascii import crc32
import serial
import six
-import six
-
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
index 02cad8b2..1ed185ca 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -7,7 +7,10 @@ import os
import re
import shutil
import tempfile
-import unittest
+
+
+def load_tfile_or_url(*args, **kwargs):
+ return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents))
class TestAptProxyConfig(TestCase):
@@ -30,7 +33,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = util.load_tfile_or_url(self.pfile)
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_http_proxy_written(self):
@@ -40,7 +43,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = util.load_tfile_or_url(self.pfile)
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_all_proxy_written(self):
@@ -58,7 +61,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = util.load_tfile_or_url(self.pfile)
+ contents = load_tfile_or_url(self.pfile)
for ptype, pval in values.items():
self.assertTrue(self._search_apt_config(contents, ptype, pval))
@@ -74,7 +77,7 @@ class TestAptProxyConfig(TestCase):
cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
- contents = util.load_tfile_or_url(self.pfile)
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "foo"))
def test_config_written(self):
@@ -86,14 +89,14 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(os.path.isfile(self.cfile))
self.assertFalse(os.path.isfile(self.pfile))
- self.assertEqual(util.load_tfile_or_url(self.cfile), payload)
+ self.assertEqual(load_tfile_or_url(self.cfile), payload)
def test_config_replaced(self):
util.write_file(self.pfile, "content doesnt matter")
cc_apt_configure.apply_apt_config({'apt_config': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.cfile))
- self.assertEqual(util.load_tfile_or_url(self.cfile), "foo")
+ self.assertEqual(load_tfile_or_url(self.cfile), "foo")
def test_config_deleted(self):
# if no 'apt_config' is provided, delete any previously written file
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
new file mode 100644
index 00000000..ddef8d48
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_disk_setup.py
@@ -0,0 +1,30 @@
+from cloudinit.config import cc_disk_setup
+from ..helpers import ExitStack, mock, TestCase
+
+
+class TestIsDiskUsed(TestCase):
+
+ def setUp(self):
+ super(TestIsDiskUsed, self).setUp()
+ self.patches = ExitStack()
+ mod_name = 'cloudinit.config.cc_disk_setup'
+ self.enumerate_disk = self.patches.enter_context(
+ mock.patch('{0}.enumerate_disk'.format(mod_name)))
+ self.check_fs = self.patches.enter_context(
+ mock.patch('{0}.check_fs'.format(mod_name)))
+
+ def test_multiple_child_nodes_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_valid_filesystem_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (
+ mock.MagicMock(), 'ext4', mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_one_child_nodes_and_no_fs_returns_false(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock()))
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index f3109bac..eceb14d9 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -38,7 +38,6 @@ class TestInstallPackages(t_help.TestCase):
if 'args' not in kwargs:
kwargs['args'] = args[0]
self.subp_called.append(kwargs)
- snap_cmds = []
args = kwargs['args']
# here we basically parse the snappy command invoked
# and append to snapcmds a list of (mode, pkg, config)
@@ -117,9 +116,6 @@ class TestInstallPackages(t_help.TestCase):
def test_package_ops_common_filename(self):
# fish package name from filename
# package names likely look like: pkgname.namespace_version_arch.snap
- fname = "xkcd-webserver.canonical_0.3.4_all.snap"
- name = "xkcd-webserver.canonical"
- shortname = "xkcd-webserver"
# find filenames
self.populate_tmp(
@@ -165,7 +161,6 @@ class TestInstallPackages(t_help.TestCase):
'ubuntu-core': {'c1': 'c2'},
'notinstalled.smoser': {'s1': 's2'},
}
- cfg = {'config-example-k1': 'config-example-k2'}
ret = get_package_ops(
packages=['config-example.canonical'], configs=cfgs,
installed=['config-example.smoser', 'pkg1.canonical',
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index cf7c03b0..0c19a2c2 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -18,10 +18,6 @@
from __future__ import print_function
-import sys
-import six
-import unittest
-
from . import helpers as test_helpers
import textwrap
@@ -30,6 +26,7 @@ from cloudinit import templater
try:
import Cheetah
HAS_CHEETAH = True
+ Cheetah # make pyflakes happy, as Cheetah is not used here
except ImportError:
HAS_CHEETAH = False
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 1619b5d2..95990165 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -459,4 +459,21 @@ class TestMessageFromString(helpers.TestCase):
roundtripped = util.message_from_string(u'\n').as_string()
self.assertNotIn('\x00', roundtripped)
+
+class TestReadSeeded(helpers.TestCase):
+ def setUp(self):
+ super(TestReadSeeded, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def test_unicode_not_messed_up(self):
+ ud = b"userdatablob"
+ helpers.populate_dir(
+ self.tmp, {'meta-data': "key1: val1", 'user-data': ud})
+ sdir = self.tmp + os.path.sep
+ (found_md, found_ud) = util.read_seeded(sdir)
+
+ self.assertEqual(found_md, {'key1': 'val1'})
+ self.assertEqual(found_ud, ud)
+
# vi: ts=4 expandtab