summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorScott Moser <smoser@brickies.net>2016-11-04 13:46:09 -0400
committerScott Moser <smoser@brickies.net>2016-11-04 13:46:09 -0400
commitb380e16183446f2e39f47a3c0804d2081714acb2 (patch)
treea5e05b4a22553769d6eaa7e5fa93e3401a758b2b /tests
parent882b22e024733e17757fdbe36ba2a3672c6ebe06 (diff)
parenta1cdebdea65ccd827060c823146992bba9debe19 (diff)
downloadvyos-cloud-init-b380e16183446f2e39f47a3c0804d2081714acb2.tar.gz
vyos-cloud-init-b380e16183446f2e39f47a3c0804d2081714acb2.zip
merge from master at 0.7.8-34-ga1cdebd
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/__init__.py9
-rw-r--r--tests/unittests/helpers.py8
-rw-r--r--tests/unittests/test_data.py45
-rw-r--r--tests/unittests/test_datasource/test_aliyun.py148
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py338
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py23
-rwxr-xr-x[-rw-r--r--]tests/unittests/test_distros/test_user_data_normalize.py70
-rw-r--r--tests/unittests/test_handler/test_handler_apt_conf_v1.py2
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py4
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py51
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py293
-rw-r--r--tests/unittests/test_net.py43
-rw-r--r--tests/unittests/test_util.py42
13 files changed, 964 insertions, 112 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py
index e69de29b..1b34b5af 100644
--- a/tests/unittests/__init__.py
+++ b/tests/unittests/__init__.py
@@ -0,0 +1,9 @@
+try:
+ # For test cases, avoid the following UserWarning to stderr:
+ # You don't have the C version of NameMapper installed ...
+ from Cheetah import NameMapper as _nm
+ _nm.C_VERSION = True
+except ImportError:
+ pass
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 1cdc05a1..a2355a79 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -205,6 +205,14 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
self.patched_funcs.enter_context(
mock.patch.object(sys, 'stderr', stderr))
+ def reRoot(self, root=None):
+ if root is None:
+ root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, root)
+ self.patchUtils(root)
+ self.patchOS(root)
+ return root
+
def import_httpretty():
"""Import HTTPretty and monkey patch Python 3.4 issue.
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 13db8a4c..55d9b93f 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -3,8 +3,6 @@
import gzip
import logging
import os
-import shutil
-import tempfile
try:
from unittest import mock
@@ -98,10 +96,7 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
ci = stages.Init()
ci.datasource = FakeDataSource(blob)
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
+ self.reRoot()
ci.fetch()
ci.consume_data()
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
@@ -127,9 +122,7 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
+ self.reRoot()
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
initer.read_cfg()
@@ -167,9 +160,7 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
+ self.reRoot()
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
initer.read_cfg()
@@ -212,12 +203,9 @@ c: d
message.attach(message_cc)
message.attach(message_jp)
+ self.reRoot()
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
ci.fetch()
ci.consume_data()
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
@@ -245,9 +233,7 @@ name: user
run:
- z
'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
+ self.reRoot()
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
initer.read_cfg()
@@ -281,9 +267,7 @@ vendor_data:
enabled: True
prefix: /bin/true
'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
+ new_root = self.reRoot()
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
initer.read_cfg()
@@ -342,10 +326,7 @@ p: 1
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
+ self.reRoot()
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
None)
for i, m in enumerate(messages):
@@ -365,6 +346,7 @@ p: 1
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
+ self.reRoot()
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
@@ -402,10 +384,7 @@ c: 4
message.attach(gzip_part(base_content2))
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
+ self.reRoot()
ci.fetch()
ci.consume_data()
contents = util.load_file(ci.paths.get_ipath("cloud_config"))
@@ -418,6 +397,7 @@ c: 4
def test_mime_text_plain(self):
"""Mime message of type text/plain is ignored but shows warning."""
+ self.reRoot()
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
@@ -435,6 +415,7 @@ c: 4
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script."""
+ self.reRoot()
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
ci.datasource = FakeDataSource(script)
@@ -453,6 +434,7 @@ c: 4
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script."""
+ self.reRoot()
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "x-shellscript")
@@ -473,6 +455,7 @@ c: 4
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script."""
+ self.reRoot()
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "plain")
@@ -493,6 +476,7 @@ c: 4
def test_mime_application_octet_stream(self):
"""Mime type application/octet-stream is ignored but shows warning."""
+ self.reRoot()
ci = stages.Init()
message = MIMEBase("application", "octet-stream")
message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
@@ -516,6 +500,7 @@ c: 4
{'content': non_decodable}]
message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
+ self.reRoot()
ci = stages.Init()
ci.datasource = FakeDataSource(message)
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py
new file mode 100644
index 00000000..6f1de072
--- /dev/null
+++ b/tests/unittests/test_datasource/test_aliyun.py
@@ -0,0 +1,148 @@
+import functools
+import httpretty
+import os
+
+from .. import helpers as test_helpers
+from cloudinit import helpers
+from cloudinit.sources import DataSourceAliYun as ay
+
+DEFAULT_METADATA = {
+ 'instance-id': 'aliyun-test-vm-00',
+ 'eipv4': '10.0.0.1',
+ 'hostname': 'test-hostname',
+ 'image-id': 'm-test',
+ 'launch-index': '0',
+ 'mac': '00:16:3e:00:00:00',
+ 'network-type': 'vpc',
+ 'private-ipv4': '192.168.0.1',
+ 'serial-number': 'test-string',
+ 'vpc-cidr-block': '192.168.0.0/16',
+ 'vpc-id': 'test-vpc',
+ 'vswitch-id': 'test-vpc',
+ 'vswitch-cidr-block': '192.168.0.0/16',
+ 'zone-id': 'test-zone-1',
+ 'ntp-conf': {'ntp_servers': [
+ 'ntp1.aliyun.com',
+ 'ntp2.aliyun.com',
+ 'ntp3.aliyun.com']},
+ 'source-address': ['http://mirrors.aliyun.com',
+ 'http://mirrors.aliyuncs.com'],
+ 'public-keys': {'key-pair-1': {'openssh-key': 'ssh-rsa AAAAB3...'},
+ 'key-pair-2': {'openssh-key': 'ssh-rsa AAAAB3...'}}
+}
+
+DEFAULT_USERDATA = """\
+#cloud-config
+
+hostname: localhost"""
+
+
+def register_mock_metaserver(base_url, data):
+ def register_helper(register, base_url, body):
+ if isinstance(body, str):
+ register(base_url, body)
+ elif isinstance(body, list):
+ register(base_url.rstrip('/'), '\n'.join(body) + '\n')
+ elif isinstance(body, dict):
+ vals = []
+ for k, v in body.items():
+ if isinstance(v, (str, list)):
+ suffix = k.rstrip('/')
+ else:
+ suffix = k.rstrip('/') + '/'
+ vals.append(suffix)
+ url = base_url.rstrip('/') + '/' + suffix
+ register_helper(register, url, v)
+ register(base_url, '\n'.join(vals) + '\n')
+
+ register = functools.partial(httpretty.register_uri, httpretty.GET)
+ register_helper(register, base_url, data)
+
+
+class TestAliYunDatasource(test_helpers.HttprettyTestCase):
+ def setUp(self):
+ super(TestAliYunDatasource, self).setUp()
+ cfg = {'datasource': {'AliYun': {'timeout': '1', 'max_wait': '1'}}}
+ distro = {}
+ paths = helpers.Paths({})
+ self.ds = ay.DataSourceAliYun(cfg, distro, paths)
+ self.metadata_address = self.ds.metadata_urls[0]
+ self.api_ver = self.ds.api_ver
+
+ @property
+ def default_metadata(self):
+ return DEFAULT_METADATA
+
+ @property
+ def default_userdata(self):
+ return DEFAULT_USERDATA
+
+ @property
+ def metadata_url(self):
+ return os.path.join(self.metadata_address,
+ self.api_ver, 'meta-data') + '/'
+
+ @property
+ def userdata_url(self):
+ return os.path.join(self.metadata_address,
+ self.api_ver, 'user-data')
+
+ def regist_default_server(self):
+ register_mock_metaserver(self.metadata_url, self.default_metadata)
+ register_mock_metaserver(self.userdata_url, self.default_userdata)
+
+ def _test_get_data(self):
+ self.assertEqual(self.ds.metadata, self.default_metadata)
+ self.assertEqual(self.ds.userdata_raw,
+ self.default_userdata.encode('utf8'))
+
+ def _test_get_sshkey(self):
+ pub_keys = [v['openssh-key'] for (_, v) in
+ self.default_metadata['public-keys'].items()]
+ self.assertEqual(self.ds.get_public_ssh_keys(), pub_keys)
+
+ def _test_get_iid(self):
+ self.assertEqual(self.default_metadata['instance-id'],
+ self.ds.get_instance_id())
+
+ def _test_host_name(self):
+ self.assertEqual(self.default_metadata['hostname'],
+ self.ds.get_hostname())
+
+ @httpretty.activate
+ def test_with_mock_server(self):
+ self.regist_default_server()
+ self.ds.get_data()
+ self._test_get_data()
+ self._test_get_sshkey()
+ self._test_get_iid()
+ self._test_host_name()
+
+ def test_parse_public_keys(self):
+ public_keys = {}
+ self.assertEqual(ay.parse_public_keys(public_keys), [])
+
+ public_keys = {'key-pair-0': 'ssh-key-0'}
+ self.assertEqual(ay.parse_public_keys(public_keys),
+ [public_keys['key-pair-0']])
+
+ public_keys = {'key-pair-0': 'ssh-key-0', 'key-pair-1': 'ssh-key-1'}
+ self.assertEqual(set(ay.parse_public_keys(public_keys)),
+ set([public_keys['key-pair-0'],
+ public_keys['key-pair-1']]))
+
+ public_keys = {'key-pair-0': ['ssh-key-0', 'ssh-key-1']}
+ self.assertEqual(ay.parse_public_keys(public_keys),
+ public_keys['key-pair-0'])
+
+ public_keys = {'key-pair-0': {'openssh-key': []}}
+ self.assertEqual(ay.parse_public_keys(public_keys), [])
+
+ public_keys = {'key-pair-0': {'openssh-key': 'ssh-key-0'}}
+ self.assertEqual(ay.parse_public_keys(public_keys),
+ [public_keys['key-pair-0']['openssh-key']])
+
+ public_keys = {'key-pair-0': {'openssh-key': ['ssh-key-0',
+ 'ssh-key-1']}}
+ self.assertEqual(ay.parse_public_keys(public_keys),
+ public_keys['key-pair-0']['openssh-key'])
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index f5d2ef35..7bde0820 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -20,25 +20,123 @@ import json
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceDigitalOcean
+from cloudinit.sources.helpers import digitalocean
-from .. import helpers as test_helpers
-from ..helpers import HttprettyTestCase
-
-httpretty = test_helpers.import_httpretty()
+from ..helpers import mock, TestCase
DO_MULTIPLE_KEYS = ["ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@do.co",
"ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@do.co"]
DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... test@do.co"
-DO_META = {
- 'user_data': 'user_data_here',
- 'vendor_data': 'vendor_data_here',
- 'public_keys': DO_SINGLE_KEY,
- 'region': 'nyc3',
- 'id': '2000000',
- 'hostname': 'cloudinit-test',
+# the following JSON was taken from droplet (that's why its a string)
+DO_META = json.loads("""
+{
+ "droplet_id": "22532410",
+ "hostname": "utl-96268",
+ "vendor_data": "vendordata goes here",
+ "user_data": "userdata goes here",
+ "public_keys": "",
+ "auth_key": "authorization_key",
+ "region": "nyc3",
+ "interfaces": {
+ "private": [
+ {
+ "ipv4": {
+ "ip_address": "10.132.6.205",
+ "netmask": "255.255.0.0",
+ "gateway": "10.132.0.1"
+ },
+ "mac": "04:01:57:d1:9e:02",
+ "type": "private"
+ }
+ ],
+ "public": [
+ {
+ "ipv4": {
+ "ip_address": "192.0.0.20",
+ "netmask": "255.255.255.0",
+ "gateway": "104.236.0.1"
+ },
+ "ipv6": {
+ "ip_address": "2604:A880:0800:0000:1000:0000:0000:0000",
+ "cidr": 64,
+ "gateway": "2604:A880:0800:0000:0000:0000:0000:0001"
+ },
+ "anchor_ipv4": {
+ "ip_address": "10.0.0.5",
+ "netmask": "255.255.0.0",
+ "gateway": "10.0.0.1"
+ },
+ "mac": "04:01:57:d1:9e:01",
+ "type": "public"
+ }
+ ]
+ },
+ "floating_ip": {
+ "ipv4": {
+ "active": false
+ }
+ },
+ "dns": {
+ "nameservers": [
+ "2001:4860:4860::8844",
+ "2001:4860:4860::8888",
+ "8.8.8.8"
+ ]
+ }
+}
+""")
+
+# This has no private interface
+DO_META_2 = {
+ "droplet_id": 27223699,
+ "hostname": "smtest1",
+ "vendor_data": "\n".join([
+ ('"Content-Type: multipart/mixed; '
+ 'boundary=\"===============8645434374073493512==\"'),
+ 'MIME-Version: 1.0',
+ '',
+ '--===============8645434374073493512==',
+ 'MIME-Version: 1.0'
+ 'Content-Type: text/cloud-config; charset="us-ascii"'
+ 'Content-Transfer-Encoding: 7bit'
+ 'Content-Disposition: attachment; filename="cloud-config"'
+ '',
+ '#cloud-config',
+ 'disable_root: false',
+ 'manage_etc_hosts: true',
+ '',
+ '',
+ '--===============8645434374073493512=='
+ ]),
+ "public_keys": [
+ "ssh-rsa AAAAB3NzaN...N3NtHw== smoser@brickies"
+ ],
+ "auth_key": "88888888888888888888888888888888",
+ "region": "nyc3",
+ "interfaces": {
+ "public": [{
+ "ipv4": {
+ "ip_address": "45.55.249.133",
+ "netmask": "255.255.192.0",
+ "gateway": "45.55.192.1"
+ },
+ "anchor_ipv4": {
+ "ip_address": "10.17.0.5",
+ "netmask": "255.255.0.0",
+ "gateway": "10.17.0.1"
+ },
+ "mac": "ae:cc:08:7c:88:00",
+ "type": "public"
+ }]
+ },
+ "floating_ip": {"ipv4": {"active": True, "ip_address": "138.197.59.92"}},
+ "dns": {"nameservers": ["8.8.8.8", "8.8.4.4"]},
+ "tags": None,
}
+DO_META['public_keys'] = DO_SINGLE_KEY
+
MD_URL = 'http://169.254.169.254/metadata/v1.json'
@@ -46,69 +144,189 @@ def _mock_dmi():
return (True, DO_META.get('id'))
-def _request_callback(method, uri, headers):
- return (200, headers, json.dumps(DO_META))
-
-
-class TestDataSourceDigitalOcean(HttprettyTestCase):
+class TestDataSourceDigitalOcean(TestCase):
"""
Test reading the meta-data
"""
- def setUp(self):
- self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
- settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- self.ds._get_sysinfo = _mock_dmi
- super(TestDataSourceDigitalOcean, self).setUp()
-
- @httpretty.activate
- def test_connection(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL,
- body=json.dumps(DO_META))
-
- success = self.ds.get_data()
- self.assertTrue(success)
-
- @httpretty.activate
- def test_metadata(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL,
- body=_request_callback)
- self.ds.get_data()
+ def get_ds(self, get_sysinfo=_mock_dmi):
+ ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
+ settings.CFG_BUILTIN, None, helpers.Paths({}))
+ ds.use_ip4LL = False
+ if get_sysinfo is not None:
+ ds._get_sysinfo = get_sysinfo
+ return ds
- self.assertEqual(DO_META.get('user_data'),
- self.ds.get_userdata_raw())
+ @mock.patch('cloudinit.sources.helpers.digitalocean.read_sysinfo')
+ def test_returns_false_not_on_docean(self, m_read_sysinfo):
+ m_read_sysinfo.return_value = (False, None)
+ ds = self.get_ds(get_sysinfo=None)
+ self.assertEqual(False, ds.get_data())
+ self.assertTrue(m_read_sysinfo.called)
- self.assertEqual(DO_META.get('vendor_data'),
- self.ds.get_vendordata_raw())
+ @mock.patch('cloudinit.sources.helpers.digitalocean.read_metadata')
+ def test_metadata(self, mock_readmd):
+ mock_readmd.return_value = DO_META.copy()
- self.assertEqual(DO_META.get('region'),
- self.ds.availability_zone)
+ ds = self.get_ds()
+ ret = ds.get_data()
+ self.assertTrue(ret)
- self.assertEqual(DO_META.get('id'),
- self.ds.get_instance_id())
+ self.assertTrue(mock_readmd.called)
- self.assertEqual(DO_META.get('hostname'),
- self.ds.get_hostname())
+ self.assertEqual(DO_META.get('user_data'), ds.get_userdata_raw())
+ self.assertEqual(DO_META.get('vendor_data'), ds.get_vendordata_raw())
+ self.assertEqual(DO_META.get('region'), ds.availability_zone)
+ self.assertEqual(DO_META.get('droplet_id'), ds.get_instance_id())
+ self.assertEqual(DO_META.get('hostname'), ds.get_hostname())
# Single key
self.assertEqual([DO_META.get('public_keys')],
- self.ds.get_public_ssh_keys())
+ ds.get_public_ssh_keys())
- self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
+ self.assertIsInstance(ds.get_public_ssh_keys(), list)
- @httpretty.activate
- def test_multiple_ssh_keys(self):
- DO_META['public_keys'] = DO_MULTIPLE_KEYS
- httpretty.register_uri(
- httpretty.GET, MD_URL,
- body=_request_callback)
- self.ds.get_data()
+ @mock.patch('cloudinit.sources.helpers.digitalocean.read_metadata')
+ def test_multiple_ssh_keys(self, mock_readmd):
+ metadata = DO_META.copy()
+ metadata['public_keys'] = DO_MULTIPLE_KEYS
+ mock_readmd.return_value = metadata.copy()
+
+ ds = self.get_ds()
+ ret = ds.get_data()
+ self.assertTrue(ret)
+
+ self.assertTrue(mock_readmd.called)
# Multiple keys
- self.assertEqual(DO_META.get('public_keys'),
- self.ds.get_public_ssh_keys())
+ self.assertEqual(metadata['public_keys'], ds.get_public_ssh_keys())
+ self.assertIsInstance(ds.get_public_ssh_keys(), list)
+
+
+class TestNetworkConvert(TestCase):
+
+ def _get_networking(self):
+ netcfg = digitalocean.convert_network_configuration(
+ DO_META['interfaces'], DO_META['dns']['nameservers'])
+ self.assertIn('config', netcfg)
+ return netcfg
+
+ def test_networking_defined(self):
+ netcfg = self._get_networking()
+ self.assertIsNotNone(netcfg)
+
+ for nic_def in netcfg.get('config'):
+ print(json.dumps(nic_def, indent=3))
+ n_type = nic_def.get('type')
+ n_subnets = nic_def.get('type')
+ n_name = nic_def.get('name')
+ n_mac = nic_def.get('mac_address')
+
+ self.assertIsNotNone(n_type)
+ self.assertIsNotNone(n_subnets)
+ self.assertIsNotNone(n_name)
+ self.assertIsNotNone(n_mac)
+
+ def _get_nic_definition(self, int_type, expected_name):
+ """helper function to return if_type (i.e. public) and the expected
+ name used by cloud-init (i.e eth0)"""
+ netcfg = self._get_networking()
+ meta_def = (DO_META.get('interfaces')).get(int_type)[0]
+
+ self.assertEqual(int_type, meta_def.get('type'))
+
+ for nic_def in netcfg.get('config'):
+ print(nic_def)
+ if nic_def.get('name') == expected_name:
+ return nic_def, meta_def
+
+ def _get_match_subn(self, subnets, ip_addr):
+ """get the matching subnet definition based on ip address"""
+ for subn in subnets:
+ address = subn.get('address')
+ self.assertIsNotNone(address)
+
+ # equals won't work because of ipv6 addressing being in
+ # cidr notation, i.e fe00::1/64
+ if ip_addr in address:
+ print(json.dumps(subn, indent=3))
+ return subn
+
+ def test_public_interface_defined(self):
+ """test that the public interface is defined as eth0"""
+ (nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
+ self.assertEqual('eth0', nic_def.get('name'))
+ self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address'))
+ self.assertEqual('physical', nic_def.get('type'))
+
+ def test_private_interface_defined(self):
+ """test that the private interface is defined as eth1"""
+ (nic_def, meta_def) = self._get_nic_definition('private', 'eth1')
+ self.assertEqual('eth1', nic_def.get('name'))
+ self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address'))
+ self.assertEqual('physical', nic_def.get('type'))
+
+ def _check_dns_nameservers(self, subn_def):
+ self.assertIn('dns_nameservers', subn_def)
+ expected_nameservers = DO_META['dns']['nameservers']
+ nic_nameservers = subn_def.get('dns_nameservers')
+ self.assertEqual(expected_nameservers, nic_nameservers)
+
+ def test_public_interface_ipv6(self):
+ """test public ipv6 addressing"""
+ (nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
+ ipv6_def = meta_def.get('ipv6')
+ self.assertIsNotNone(ipv6_def)
+
+ subn_def = self._get_match_subn(nic_def.get('subnets'),
+ ipv6_def.get('ip_address'))
+
+ cidr_notated_address = "{0}/{1}".format(ipv6_def.get('ip_address'),
+ ipv6_def.get('cidr'))
+
+ self.assertEqual(cidr_notated_address, subn_def.get('address'))
+ self.assertEqual(ipv6_def.get('gateway'), subn_def.get('gateway'))
+ self._check_dns_nameservers(subn_def)
+
+ def test_public_interface_ipv4(self):
+ """test public ipv4 addressing"""
+ (nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
+ ipv4_def = meta_def.get('ipv4')
+ self.assertIsNotNone(ipv4_def)
+
+ subn_def = self._get_match_subn(nic_def.get('subnets'),
+ ipv4_def.get('ip_address'))
+
+ self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask'))
+ self.assertEqual(ipv4_def.get('gateway'), subn_def.get('gateway'))
+ self._check_dns_nameservers(subn_def)
+
+ def test_public_interface_anchor_ipv4(self):
+ """test public ipv4 addressing"""
+ (nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
+ ipv4_def = meta_def.get('anchor_ipv4')
+ self.assertIsNotNone(ipv4_def)
+
+ subn_def = self._get_match_subn(nic_def.get('subnets'),
+ ipv4_def.get('ip_address'))
+
+ self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask'))
+ self.assertNotIn('gateway', subn_def)
+
+ def test_convert_without_private(self):
+ netcfg = digitalocean.convert_network_configuration(
+ DO_META_2['interfaces'], DO_META_2['dns']['nameservers'])
- self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
+ byname = {}
+ for i in netcfg['config']:
+ if 'name' in i:
+ if i['name'] in byname:
+ raise ValueError("name '%s' in config twice: %s" %
+ (i['name'], netcfg))
+ byname[i['name']] = i
+ self.assertTrue('eth0' in byname)
+ self.assertTrue('subnets' in byname['eth0'])
+ eth0 = byname['eth0']
+ self.assertEqual(
+ sorted(['45.55.249.133', '10.17.0.5']),
+ sorted([i['address'] for i in eth0['subnets']]))
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index d796f030..ce5b5550 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,7 +1,7 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from ..helpers import TestCase, populate_dir
+from ..helpers import mock, populate_dir, TestCase
import os
import pwd
@@ -31,12 +31,7 @@ SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
HOSTNAME = 'foo.example.com'
PUBLIC_IP = '10.0.0.3'
-CMD_IP_OUT = '''\
-1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
- link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
-2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
- link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
-'''
+DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(TestCase):
@@ -233,18 +228,19 @@ class TestOpenNebulaDataSource(TestCase):
class TestOpenNebulaNetwork(unittest.TestCase):
- def setUp(self):
- super(TestOpenNebulaNetwork, self).setUp()
+ system_nics = {'02:00:0a:12:01:01': 'eth0'}
def test_lo(self):
- net = ds.OpenNebulaNetwork('', {})
+ net = ds.OpenNebulaNetwork(context={}, system_nics_by_mac={})
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
''')
- def test_eth0(self):
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_eth0(self, m_get_phys_by_mac):
+ m_get_phys_by_mac.return_value = self.system_nics
+ net = ds.OpenNebulaNetwork({})
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
@@ -267,7 +263,8 @@ iface eth0 inet static
'ETH0_DNS': '1.2.3.6 1.2.3.7'
}
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
+ net = ds.OpenNebulaNetwork(context,
+ system_nics_by_mac=self.system_nics)
self.assertEqual(net.gen_conf(), u'''\
auto lo
iface lo inet loopback
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index a887a930..33bf922d 100644..100755
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -1,8 +1,10 @@
from cloudinit import distros
+from cloudinit.distros import ug_util
from cloudinit import helpers
from cloudinit import settings
from ..helpers import TestCase
+import mock
bcfg = {
@@ -29,7 +31,7 @@ class TestUGNormalize(TestCase):
return distro
def _norm(self, cfg, distro):
- return distros.normalize_users_groups(cfg, distro)
+ return ug_util.normalize_users_groups(cfg, distro)
def test_group_dict(self):
distro = self._make_distro('ubuntu')
@@ -236,7 +238,7 @@ class TestUGNormalize(TestCase):
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
- (name, config) = distros.extract_default(users)
+ (name, config) = ug_util.extract_default(users)
self.assertEqual(name, 'bob')
expected_config = {}
def_config = None
@@ -295,3 +297,67 @@ class TestUGNormalize(TestCase):
self.assertIn('bob', users)
self.assertEqual({'default': False}, users['joe'])
self.assertEqual({'default': False}, users['bob'])
+
+ @mock.patch('cloudinit.util.subp')
+ def test_create_snap_user(self, mock_subp):
+ mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
+ '')]
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'snapuser': 'joe@joe.com'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com']
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, 'joe')
+
+ @mock.patch('cloudinit.util.subp')
+ def test_create_snap_user_known(self, mock_subp):
+ mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
+ '')]
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ username = distro.create_user(user, **config)
+
+ snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known',
+ 'joe@joe.com']
+ mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
+ self.assertEqual(username, 'joe')
+
+ @mock.patch('cloudinit.util.system_is_snappy')
+ @mock.patch('cloudinit.util.is_group')
+ @mock.patch('cloudinit.util.subp')
+ def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp,
+ mock_snappy):
+ mock_isgrp.return_value = False
+ mock_subp.return_value = True
+ mock_snappy.return_value = True
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe', 'groups': 'users', 'create_groups': True},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ for (user, config) in users.items():
+ print('user=%s config=%s' % (user, config))
+ distro.add_user(user, **config)
+
+ groupcmd = ['groupadd', 'users', '--extrausers']
+ addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m']
+
+ mock_subp.assert_any_call(groupcmd)
+ mock_subp.assert_any_call(addcmd, logstring=addcmd)
diff --git a/tests/unittests/test_handler/test_handler_apt_conf_v1.py b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
index 45714efd..64acc3e0 100644
--- a/tests/unittests/test_handler/test_handler_apt_conf_v1.py
+++ b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
@@ -118,7 +118,7 @@ class TestConversion(TestCase):
def test_convert_with_apt_mirror(self):
mirror = 'http://my.mirror/ubuntu'
f = cc_apt_configure.convert_to_v3_apt_format({'apt_mirror': mirror})
- self.assertIn(mirror, {m['uri'] for m in f['apt']['primary']})
+ self.assertIn(mirror, set(m['uri'] for m in f['apt']['primary']))
def test_no_old_content(self):
mirror = 'http://my.mirror/ubuntu'
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
index e653488a..e28067de 100644
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -81,11 +81,11 @@ class TestConfig(TestCase):
self.cloud = cloud.Cloud(None, self.paths, None, None, None)
self.log = logging.getLogger("TestConfig")
self.args = []
- os.environ = {}
self.cloud_init = None
self.handle = cc_growpart.handle
+ @mock.patch.dict("os.environ", clear=True)
def test_no_resizers_auto_is_fine(self):
with mock.patch.object(
util, 'subp',
@@ -98,6 +98,7 @@ class TestConfig(TestCase):
mockobj.assert_called_once_with(
['growpart', '--help'], env={'LANG': 'C'})
+ @mock.patch.dict("os.environ", clear=True)
def test_no_resizers_mode_growpart_is_exception(self):
with mock.patch.object(
util, 'subp',
@@ -110,6 +111,7 @@ class TestConfig(TestCase):
mockobj.assert_called_once_with(
['growpart', '--help'], env={'LANG': 'C'})
+ @mock.patch.dict("os.environ", clear=True)
def test_mode_auto_prefers_growpart(self):
with mock.patch.object(
util, 'subp',
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
index 6f90defb..14366a10 100644
--- a/tests/unittests/test_handler/test_handler_lxd.py
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -132,3 +132,54 @@ class TestLxd(t_help.TestCase):
cc_lxd.bridge_to_debconf(data),
{"lxd/setup-bridge": "false",
"lxd/bridge-name": ""})
+
+ def test_lxd_cmd_new_full(self):
+ data = {"mode": "new",
+ "name": "testbr0",
+ "ipv4_address": "10.0.8.1",
+ "ipv4_netmask": "24",
+ "ipv4_dhcp_first": "10.0.8.2",
+ "ipv4_dhcp_last": "10.0.8.254",
+ "ipv4_dhcp_leases": "250",
+ "ipv4_nat": "true",
+ "ipv6_address": "fd98:9e0:3744::1",
+ "ipv6_netmask": "64",
+ "ipv6_nat": "true",
+ "domain": "lxd"}
+ self.assertEqual(
+ cc_lxd.bridge_to_cmd(data),
+ (["lxc", "network", "create", "testbr0",
+ "ipv4.address=10.0.8.1/24", "ipv4.nat=true",
+ "ipv4.dhcp.ranges=10.0.8.2-10.0.8.254",
+ "ipv6.address=fd98:9e0:3744::1/64",
+ "ipv6.nat=true", "dns.domain=lxd",
+ "--force-local"],
+ ["lxc", "network", "attach-profile",
+ "testbr0", "default", "eth0", "--force-local"]))
+
+ def test_lxd_cmd_new_partial(self):
+ data = {"mode": "new",
+ "ipv6_address": "fd98:9e0:3744::1",
+ "ipv6_netmask": "64",
+ "ipv6_nat": "true"}
+ self.assertEqual(
+ cc_lxd.bridge_to_cmd(data),
+ (["lxc", "network", "create", "lxdbr0", "ipv4.address=none",
+ "ipv6.address=fd98:9e0:3744::1/64", "ipv6.nat=true",
+ "--force-local"],
+ ["lxc", "network", "attach-profile",
+ "lxdbr0", "default", "eth0", "--force-local"]))
+
+ def test_lxd_cmd_existing(self):
+ data = {"mode": "existing",
+ "name": "testbr0"}
+ self.assertEqual(
+ cc_lxd.bridge_to_cmd(data),
+ (None, ["lxc", "network", "attach-profile",
+ "testbr0", "default", "eth0", "--force-local"]))
+
+ def test_lxd_cmd_none(self):
+ data = {"mode": "none"}
+ self.assertEqual(
+ cc_lxd.bridge_to_cmd(data),
+ (None, None))
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index 57dce1bc..e320dd82 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -1,14 +1,22 @@
from cloudinit.config.cc_snappy import (
makeop, get_package_ops, render_snap_op)
-from cloudinit import util
+from cloudinit.config.cc_snap_config import (
+ add_assertions, add_snap_user, ASSERTIONS_FILE)
+from cloudinit import (distros, helpers, cloud, util)
+from cloudinit.config.cc_snap_config import handle as snap_handle
+from cloudinit.sources import DataSourceNone
+from ..helpers import FilesystemMockingTestCase, mock
from .. import helpers as t_help
+import logging
import os
import shutil
import tempfile
+import textwrap
import yaml
+LOG = logging.getLogger(__name__)
ALLOWED = (dict, list, int, str)
@@ -287,6 +295,289 @@ class TestInstallPackages(t_help.TestCase):
self.assertEqual(yaml.safe_load(mydata), data_found)
+class TestSnapConfig(FilesystemMockingTestCase):
+
+ SYSTEM_USER_ASSERTION = textwrap.dedent("""
+ type: system-user
+ authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
+ brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
+ email: foo@bar.com
+ password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
+ series:
+ - 16
+ since: 2016-09-10T16:34:00+03:00
+ until: 2017-11-10T16:34:00+03:00
+ username: baz
+ sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj
+
+ AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
+ Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
+ zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
+ s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
+ +to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
+ Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
+ d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
+ BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
+ f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
+ v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q==""")
+
+ ACCOUNT_ASSERTION = textwrap.dedent("""
+ type: account-key
+ authority-id: canonical
+ revision: 2
+ public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
+ account-id: canonical
+ name: store
+ since: 2016-04-01T00:00:00.0Z
+ body-length: 717
+ sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH
+
+ AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
+ qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
+ vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
+ UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
+ Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
+ o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
+ VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
+ 2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
+ Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
+ vUvV7RjVzv17ut0AEQEAAQ==
+
+ AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
+ WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
+ nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
+ 3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
+ eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
+ inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
+ rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
+ rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
+ aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
+ 6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
+ haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
+ yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
+ HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
+ skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
+ CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
+ ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
+ qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
+ IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
+ oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k""")
+
+ test_assertions = [ACCOUNT_ASSERTION, SYSTEM_USER_ASSERTION]
+
+ def setUp(self):
+ super(TestSnapConfig, self).setUp()
+ self.subp = util.subp
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ @mock.patch('cloudinit.util.write_file')
+ @mock.patch('cloudinit.util.subp')
+ def test_snap_config_add_assertions(self, msubp, mwrite):
+ add_assertions(self.test_assertions)
+
+ combined = "\n".join(self.test_assertions)
+ mwrite.assert_any_call(ASSERTIONS_FILE, combined.encode('utf-8'))
+ msubp.assert_called_with(['snap', 'ack', ASSERTIONS_FILE],
+ capture=True)
+
+ def test_snap_config_add_assertions_empty(self):
+ self.assertRaises(ValueError, add_assertions, [])
+
+ def test_add_assertions_nonlist(self):
+ self.assertRaises(ValueError, add_assertions, {})
+
+ @mock.patch('cloudinit.util.write_file')
+ @mock.patch('cloudinit.util.subp')
+ def test_snap_config_add_assertions_ack_fails(self, msubp, mwrite):
+ msubp.side_effect = [util.ProcessExecutionError("Invalid assertion")]
+ self.assertRaises(util.ProcessExecutionError, add_assertions,
+ self.test_assertions)
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_no_config(self, mock_util, mock_add):
+ cfg = {}
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ snap_handle('snap_config', cfg, cc, LOG, None)
+ mock_add.assert_not_called()
+
+ def test_snap_config_add_snap_user_no_config(self):
+ usercfg = add_snap_user(cfg=None)
+ self.assertEqual(usercfg, None)
+
+ def test_snap_config_add_snap_user_not_dict(self):
+ cfg = ['foobar']
+ self.assertRaises(ValueError, add_snap_user, cfg)
+
+ def test_snap_config_add_snap_user_no_email(self):
+ cfg = {'assertions': [], 'known': True}
+ usercfg = add_snap_user(cfg=cfg)
+ self.assertEqual(usercfg, None)
+
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_add_snap_user_email_only(self, mock_util):
+ email = 'janet@planetjanet.org'
+ cfg = {'email': email}
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("false\n", ""), # snap managed
+ ]
+
+ usercfg = add_snap_user(cfg=cfg)
+
+ self.assertEqual(usercfg, {'snapuser': email, 'known': False})
+
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_add_snap_user_email_known(self, mock_util):
+ email = 'janet@planetjanet.org'
+ known = True
+ cfg = {'email': email, 'known': known}
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("false\n", ""), # snap managed
+ (self.SYSTEM_USER_ASSERTION, ""), # snap known system-user
+ ]
+
+ usercfg = add_snap_user(cfg=cfg)
+
+ self.assertEqual(usercfg, {'snapuser': email, 'known': known})
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_system_not_snappy(self, mock_util, mock_add):
+ cfg = {'snappy': {'assertions': self.test_assertions}}
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = False
+
+ snap_handle('snap_config', cfg, cc, LOG, None)
+
+ mock_add.assert_not_called()
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_snapuser(self, mock_util, mock_add):
+ email = 'janet@planetjanet.org'
+ cfg = {
+ 'snappy': {
+ 'assertions': self.test_assertions,
+ 'email': email,
+ }
+ }
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("false\n", ""), # snap managed
+ ]
+
+ snap_handle('snap_config', cfg, cc, LOG, None)
+
+ mock_add.assert_called_with(self.test_assertions)
+ usercfg = {'snapuser': email, 'known': False}
+ cc.distro.create_user.assert_called_with(email, **usercfg)
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_snapuser_known(self, mock_util, mock_add):
+ email = 'janet@planetjanet.org'
+ cfg = {
+ 'snappy': {
+ 'assertions': self.test_assertions,
+ 'email': email,
+ 'known': True,
+ }
+ }
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("false\n", ""), # snap managed
+ (self.SYSTEM_USER_ASSERTION, ""), # snap known system-user
+ ]
+
+ snap_handle('snap_config', cfg, cc, LOG, None)
+
+ mock_add.assert_called_with(self.test_assertions)
+ usercfg = {'snapuser': email, 'known': True}
+ cc.distro.create_user.assert_called_with(email, **usercfg)
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_snapuser_known_managed(self, mock_util,
+ mock_add):
+ email = 'janet@planetjanet.org'
+ cfg = {
+ 'snappy': {
+ 'assertions': self.test_assertions,
+ 'email': email,
+ 'known': True,
+ }
+ }
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("true\n", ""), # snap managed
+ ]
+
+ snap_handle('snap_config', cfg, cc, LOG, None)
+
+ mock_add.assert_called_with(self.test_assertions)
+ cc.distro.create_user.assert_not_called()
+
+ @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
+ @mock.patch('cloudinit.config.cc_snap_config.util')
+ def test_snap_config_handle_snapuser_known_no_assertion(self, mock_util,
+ mock_add):
+ email = 'janet@planetjanet.org'
+ cfg = {
+ 'snappy': {
+ 'assertions': [self.ACCOUNT_ASSERTION],
+ 'email': email,
+ 'known': True,
+ }
+ }
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ mock_util.system_is_snappy.return_value = True
+ mock_util.subp.side_effect = [
+ ("true\n", ""), # snap managed
+ ("", ""), # snap known system-user
+ ]
+
+ snap_handle('snap_config', cfg, cc, LOG, None)
+
+ mock_add.assert_called_with([self.ACCOUNT_ASSERTION])
+ cc.distro.create_user.assert_not_called()
+
+
def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
if cfgfile:
cfgfile = os.path.sep.join([tmpd, cfgfile])
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 41b9a6d0..78c080ca 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -53,6 +53,45 @@ DHCP_EXPECTED_1 = {
'dns_nameservers': ['192.168.122.1']}],
}
+DHCP6_CONTENT_1 = """
+DEVICE=eno1
+HOSTNAME=
+DNSDOMAIN=
+reason='PREINIT'
+interface='eno1'
+DEVICE=eno1
+HOSTNAME=
+DNSDOMAIN=
+reason='FAIL'
+interface='eno1'
+DEVICE=eno1
+HOSTNAME=
+DNSDOMAIN=
+reason='PREINIT6'
+interface='eno1'
+DEVICE=eno1
+IPV6PROTO=dhcp6
+IPV6ADDR=2001:67c:1562:8010:0:1::
+IPV6NETMASK=64
+IPV6DNS0=2001:67c:1562:8010::2:1
+IPV6DOMAINSEARCH=
+HOSTNAME=
+DNSDOMAIN=
+reason='BOUND6'
+interface='eno1'
+new_ip6_address='2001:67c:1562:8010:0:1::'
+new_ip6_prefixlen='64'
+new_dhcp6_name_servers='2001:67c:1562:8010::2:1'
+"""
+
+DHCP6_EXPECTED_1 = {
+ 'name': 'eno1',
+ 'type': 'physical',
+ 'subnets': [{'control': 'manual',
+ 'dns_nameservers': ['2001:67c:1562:8010::2:1'],
+ 'netmask': '64',
+ 'type': 'dhcp6'}]}
+
STATIC_CONTENT_1 = """
DEVICE='eth1'
@@ -590,6 +629,10 @@ class TestCmdlineConfigParsing(TestCase):
found = cmdline._klibc_to_config_entry(DHCP_CONTENT_1)
self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
+ def test_cmdline_convert_dhcp6(self):
+ found = cmdline._klibc_to_config_entry(DHCP6_CONTENT_1)
+ self.assertEqual(found, ('eno1', DHCP6_EXPECTED_1))
+
def test_cmdline_convert_static(self):
found = cmdline._klibc_to_config_entry(STATIC_CONTENT_1)
self.assertEqual(found, ('eth1', STATIC_EXPECTED_1))
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index d2031f59..f6a8ab75 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -223,8 +223,10 @@ class TestKeyValStrings(helpers.TestCase):
class TestGetCmdline(helpers.TestCase):
def test_cmdline_reads_debug_env(self):
- os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
- self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
+ with mock.patch.dict("os.environ",
+ values={'DEBUG_PROC_CMDLINE': 'abcd 123'}):
+ ret = util.get_cmdline()
+ self.assertEqual("abcd 123", ret)
class TestLoadYaml(helpers.TestCase):
@@ -384,7 +386,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
dmi_name = 'use-dmidecode'
self._configure_dmidecode_return(dmi_name, dmi_val)
- expected = {'armel': None, 'aarch64': None, 'x86_64': dmi_val}
+ expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val}
found = {}
# we do not run the 'dmi-decode' binary on some arches
# verify that anything requested that is not in the sysfs dir
@@ -516,6 +518,7 @@ class TestSubp(helpers.TestCase):
utf8_invalid = b'ab\xaadef'
utf8_valid = b'start \xc3\xa9 end'
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
+ printenv = ['bash', '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
def printf_cmd(self, *args):
# bash's printf supports \xaa. So does /usr/bin/printf
@@ -550,7 +553,7 @@ class TestSubp(helpers.TestCase):
def test_subp_decode_invalid_utf8_replaces(self):
(out, _err) = util.subp(self.stdin2out, capture=True,
data=self.utf8_invalid)
- expected = self.utf8_invalid.decode('utf-8', errors='replace')
+ expected = self.utf8_invalid.decode('utf-8', 'replace')
self.assertEqual(out, expected)
def test_subp_decode_strict_raises(self):
@@ -566,6 +569,29 @@ class TestSubp(helpers.TestCase):
self.assertEqual(err, data)
self.assertEqual(out, b'')
+ def test_subp_reads_env(self):
+ with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
+ out, err = util.subp(self.printenv + ['FOO'], capture=True)
+ self.assertEqual('FOO=BAR', out.splitlines()[0])
+
+ def test_subp_env_and_update_env(self):
+ out, err = util.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ env={'FOO': 'BAR'},
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
+
+ def test_subp_update_env(self):
+ extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
+ with mock.patch.dict("os.environ", values=extra):
+ out, err = util.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
+
def test_returns_none_if_no_capture(self):
(out, err) = util.subp(self.stdin2out, data=b'', capture=False)
self.assertEqual(err, None)
@@ -577,4 +603,12 @@ class TestSubp(helpers.TestCase):
self.assertEqual("/target/my/path/",
util.target_path("/target/", "///my/path/"))
+
+class TestEncode(helpers.TestCase):
+ """Test the encoding functions"""
+ def test_decode_binary_plain_text_with_hex(self):
+ blob = 'BOOTABLE_FLAG=\x80init=/bin/systemd'
+ text = util.decode_binary(blob)
+ self.assertEqual(text, blob)
+
# vi: ts=4 expandtab