summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorScott Moser <smoser@brickies.net>2016-09-09 21:46:49 -0400
committerScott Moser <smoser@brickies.net>2016-09-09 21:46:49 -0400
commitea732e69516983b1d9838b0d80540a832594748a (patch)
treef23cbf03e360f913e98e15d232bcf871770806e8 /tests
parenteb5860ec6ed76a90fb837001ab2ed54e1dcf78de (diff)
parent34a26f7f59f2963691e36ca0476bec9fc9ccef63 (diff)
downloadvyos-cloud-init-ea732e69516983b1d9838b0d80540a832594748a.tar.gz
vyos-cloud-init-ea732e69516983b1d9838b0d80540a832594748a.zip
Merge branch 'master' into ubuntu/xenial
Diffstat (limited to 'tests')
-rw-r--r--tests/configs/sample1.yaml3
-rw-r--r--tests/unittests/helpers.py18
-rw-r--r--tests/unittests/test_atomic_helper.py54
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py15
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py149
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py67
-rw-r--r--tests/unittests/test_datasource/test_maas.py127
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py85
-rw-r--r--tests/unittests/test_datasource/test_openstack.py3
-rw-r--r--tests/unittests/test_datasource/test_smartos.py350
-rw-r--r--tests/unittests/test_distros/test_generic.py3
-rw-r--r--tests/unittests/test_handler/test_handler_apt_conf_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_configure.py)46
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_configure_sources_list.py)64
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py187
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_source.py)214
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v3.py1104
-rw-r--r--tests/unittests/test_handler/test_handler_mcollective.py128
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py274
-rw-r--r--tests/unittests/test_handler/test_handler_spacewalk.py42
-rw-r--r--tests/unittests/test_util.py95
20 files changed, 2799 insertions, 229 deletions
diff --git a/tests/configs/sample1.yaml b/tests/configs/sample1.yaml
index 6231f293..ae935cc0 100644
--- a/tests/configs/sample1.yaml
+++ b/tests/configs/sample1.yaml
@@ -3,9 +3,6 @@
#apt_upgrade: true
packages: [ bzr, pastebinit, ubuntu-dev-tools, ccache, bzr-builddeb, vim-nox, git-core, lftp ]
-#apt_sources:
-# - source: ppa:smoser/ppa
-
#disable_root: False
# mounts:
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 972245df..1cdc05a1 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -252,11 +252,27 @@ class HttprettyTestCase(TestCase):
super(HttprettyTestCase, self).tearDown()
+class TempDirTestCase(TestCase):
+ # provide a tempdir per class, not per test.
+ def setUp(self):
+ super(TempDirTestCase, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def tmp_path(self, path):
+ if path.startswith(os.path.sep):
+ path = "." + path
+
+ return os.path.normpath(os.path.join(self.tmp, path))
+
+
def populate_dir(path, files):
if not os.path.exists(path):
os.makedirs(path)
for (name, content) in files.items():
- with open(os.path.join(path, name), "wb") as fp:
+ p = os.path.join(path, name)
+ util.ensure_dir(os.path.dirname(p))
+ with open(p, "wb") as fp:
if isinstance(content, six.binary_type):
fp.write(content)
else:
diff --git a/tests/unittests/test_atomic_helper.py b/tests/unittests/test_atomic_helper.py
new file mode 100644
index 00000000..feb81551
--- /dev/null
+++ b/tests/unittests/test_atomic_helper.py
@@ -0,0 +1,54 @@
+import json
+import os
+import stat
+
+from cloudinit import atomic_helper
+
+from . import helpers
+
+
+class TestAtomicHelper(helpers.TempDirTestCase):
+ def test_basic_usage(self):
+ """write_file takes bytes if no omode."""
+ path = self.tmp_path("test_basic_usage")
+ contents = b"Hey there\n"
+ atomic_helper.write_file(path, contents)
+ self.check_file(path, contents)
+
+ def test_string(self):
+ """write_file can take a string with mode w."""
+ path = self.tmp_path("test_string")
+ contents = "Hey there\n"
+ atomic_helper.write_file(path, contents, omode="w")
+ self.check_file(path, contents, omode="r")
+
+ def test_file_permissions(self):
+ """write_file with mode 400 works correctly."""
+ path = self.tmp_path("test_file_permissions")
+ contents = b"test_file_perms"
+ atomic_helper.write_file(path, contents, mode=0o400)
+ self.check_file(path, contents, perms=0o400)
+
+ def test_write_json(self):
+ """write_json output is readable json."""
+ path = self.tmp_path("test_write_json")
+ data = {'key1': 'value1', 'key2': ['i1', 'i2']}
+ atomic_helper.write_json(path, data)
+ with open(path, "r") as fp:
+ found = json.load(fp)
+ self.assertEqual(data, found)
+ self.check_perms(path, 0o644)
+
+ def check_file(self, path, content, omode=None, perms=0o644):
+ if omode is None:
+ omode = "rb"
+ self.assertTrue(os.path.exists(path))
+ self.assertTrue(os.path.isfile(path))
+ with open(path, omode) as fp:
+ found = fp.read()
+ self.assertEqual(content, found)
+ self.check_perms(path, perms)
+
+ def check_perms(self, path, perms):
+ file_stat = os.stat(path)
+ self.assertEqual(perms, stat.S_IMODE(file_stat.st_mode))
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 65202ff0..64523e16 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -54,13 +54,17 @@ class TestFindEndpoint(TestCase):
self.load_file = patches.enter_context(
mock.patch.object(azure_helper.util, 'load_file'))
+ self.dhcp_options = patches.enter_context(
+ mock.patch.object(azure_helper.WALinuxAgentShim,
+ '_load_dhclient_json'))
+
def test_missing_file(self):
- self.load_file.side_effect = IOError
- self.assertRaises(IOError,
+ self.assertRaises(ValueError,
azure_helper.WALinuxAgentShim.find_endpoint)
def test_missing_special_azure_line(self):
self.load_file.return_value = ''
+ self.dhcp_options.return_value = {'eth0': {'key': 'value'}}
self.assertRaises(ValueError,
azure_helper.WALinuxAgentShim.find_endpoint)
@@ -72,13 +76,18 @@ class TestFindEndpoint(TestCase):
' option unknown-245 {0};'.format(encoded_address),
'}'])
+ def test_from_dhcp_client(self):
+ self.dhcp_options.return_value = {"eth0": {"unknown_245": "5:4:3:2"}}
+ self.assertEqual('5.4.3.2',
+ azure_helper.WALinuxAgentShim.find_endpoint(None))
+
def test_latest_lease_used(self):
encoded_addresses = ['5:4:3:2', '4:3:2:1']
file_content = '\n'.join([self._build_lease_content(encoded_address)
for encoded_address in encoded_addresses])
self.load_file.return_value = file_content
self.assertEqual(encoded_addresses[-1].replace(':', '.'),
- azure_helper.WALinuxAgentShim.find_endpoint())
+ azure_helper.WALinuxAgentShim.find_endpoint("foobar"))
class TestExtractIpAddressFromLeaseValue(TestCase):
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 18551b92..98ff97a7 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -101,6 +101,98 @@ NETWORK_DATA_2 = {
"type": "vif", "id": "eth1", "vif_id": "vif-foo2"}]
}
+# This network data ha 'tap' type for a link.
+NETWORK_DATA_3 = {
+ "services": [{"type": "dns", "address": "172.16.36.11"},
+ {"type": "dns", "address": "172.16.36.12"}],
+ "networks": [
+ {"network_id": "7c41450c-ba44-401a-9ab1-1604bb2da51e",
+ "type": "ipv4", "netmask": "255.255.255.128",
+ "link": "tap77a0dc5b-72", "ip_address": "172.17.48.18",
+ "id": "network0",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "172.17.48.1"}]},
+ {"network_id": "7c41450c-ba44-401a-9ab1-1604bb2da51e",
+ "type": "ipv6", "netmask": "ffff:ffff:ffff:ffff::",
+ "link": "tap77a0dc5b-72",
+ "ip_address": "fdb8:52d0:9d14:0:f816:3eff:fe9f:70d",
+ "id": "network1",
+ "routes": [{"netmask": "::", "network": "::",
+ "gateway": "fdb8:52d0:9d14::1"}]},
+ {"network_id": "1f53cb0e-72d3-47c7-94b9-ff4397c5fe54",
+ "type": "ipv4", "netmask": "255.255.255.128",
+ "link": "tap7d6b7bec-93", "ip_address": "172.16.48.13",
+ "id": "network2",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "172.16.48.1"},
+ {"netmask": "255.255.0.0", "network": "172.16.0.0",
+ "gateway": "172.16.48.1"}]}],
+ "links": [
+ {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": None,
+ "type": "tap", "id": "tap77a0dc5b-72",
+ "vif_id": "77a0dc5b-720e-41b7-bfa7-1b2ff62e0d48"},
+ {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": None,
+ "type": "tap", "id": "tap7d6b7bec-93",
+ "vif_id": "7d6b7bec-93e6-4c03-869a-ddc5014892d5"}
+ ]
+}
+
+NETWORK_DATA_BOND = {
+ "services": [
+ {"type": "dns", "address": "1.1.1.191"},
+ {"type": "dns", "address": "1.1.1.4"},
+ ],
+ "networks": [
+ {"id": "network2-ipv4", "ip_address": "2.2.2.13",
+ "link": "vlan2", "netmask": "255.255.255.248",
+ "network_id": "4daf5ce8-38cf-4240-9f1a-04e86d7c6117",
+ "type": "ipv4",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "2.2.2.9"}]},
+ {"id": "network3-ipv4", "ip_address": "10.0.1.5",
+ "link": "vlan3", "netmask": "255.255.255.248",
+ "network_id": "a9e2f47c-3c43-4782-94d0-e1eeef1c8c9d",
+ "type": "ipv4",
+ "routes": [{"netmask": "255.255.255.255",
+ "network": "192.168.1.0", "gateway": "10.0.1.1"}]}
+ ],
+ "links": [
+ {"ethernet_mac_address": "0c:c4:7a:34:6e:3c",
+ "id": "eth0", "mtu": 1500, "type": "phy"},
+ {"ethernet_mac_address": "0c:c4:7a:34:6e:3d",
+ "id": "eth1", "mtu": 1500, "type": "phy"},
+ {"bond_links": ["eth0", "eth1"],
+ "bond_miimon": 100, "bond_mode": "4",
+ "bond_xmit_hash_policy": "layer3+4",
+ "ethernet_mac_address": "0c:c4:7a:34:6e:3c",
+ "id": "bond0", "type": "bond"},
+ {"ethernet_mac_address": "fa:16:3e:b3:72:30",
+ "id": "vlan2", "type": "vlan", "vlan_id": 602,
+ "vlan_link": "bond0", "vlan_mac_address": "fa:16:3e:b3:72:30"},
+ {"ethernet_mac_address": "fa:16:3e:66:ab:a6",
+ "id": "vlan3", "type": "vlan", "vlan_id": 612, "vlan_link": "bond0",
+ "vlan_mac_address": "fa:16:3e:66:ab:a6"}
+ ]
+}
+
+NETWORK_DATA_VLAN = {
+ "services": [{"type": "dns", "address": "1.1.1.191"}],
+ "networks": [
+ {"id": "network1-ipv4", "ip_address": "10.0.1.5",
+ "link": "vlan1", "netmask": "255.255.255.248",
+ "network_id": "a9e2f47c-3c43-4782-94d0-e1eeef1c8c9d",
+ "type": "ipv4",
+ "routes": [{"netmask": "255.255.255.255",
+ "network": "192.168.1.0", "gateway": "10.0.1.1"}]}
+ ],
+ "links": [
+ {"ethernet_mac_address": "fa:16:3e:69:b0:58",
+ "id": "eth0", "mtu": 1500, "type": "phy"},
+ {"ethernet_mac_address": "fa:16:3e:b3:72:30",
+ "id": "vlan1", "type": "vlan", "vlan_id": 602,
+ "vlan_link": "eth0", "vlan_mac_address": "fa:16:3e:b3:72:30"},
+ ]
+}
KNOWN_MACS = {
'fa:16:3e:69:b0:58': 'enp0s1',
@@ -108,6 +200,8 @@ KNOWN_MACS = {
'fa:16:3e:dd:50:9a': 'foo1',
'fa:16:3e:a8:14:69': 'foo2',
'fa:16:3e:ed:9a:59': 'foo3',
+ '0c:c4:7a:34:6e:3d': 'oeth1',
+ '0c:c4:7a:34:6e:3c': 'oeth0',
}
CFG_DRIVE_FILES_V2 = {
@@ -555,6 +649,61 @@ class TestConvertNetworkData(TestCase):
eni_rendering = f.read()
self.assertIn("route add default gw 2.2.2.9", eni_rendering)
+ def test_conversion_with_tap(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA_3,
+ known_macs=KNOWN_MACS)
+ physicals = set()
+ for i in ncfg['config']:
+ if i.get('type') == "physical":
+ physicals.add(i['name'])
+ self.assertEqual(physicals, set(('foo1', 'foo2')))
+
+ def test_bond_conversion(self):
+ # light testing of bond conversion and eni rendering of bond
+ ncfg = openstack.convert_net_json(NETWORK_DATA_BOND,
+ known_macs=KNOWN_MACS)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+
+ # Verify there are expected interfaces in the net config.
+ interfaces = sorted(
+ [i['name'] for i in ncfg['config']
+ if i['type'] in ('vlan', 'bond', 'physical')])
+ self.assertEqual(
+ sorted(["oeth0", "oeth1", "bond0", "bond0.602", "bond0.612"]),
+ interfaces)
+
+ words = eni_rendering.split()
+ # 'eth0' and 'eth1' are the ids. because their mac adresses
+ # map to other names, we should not see them in the ENI
+ self.assertNotIn('eth0', words)
+ self.assertNotIn('eth1', words)
+
+ # oeth0 and oeth1 are the interface names for eni.
+ # bond0 will be generated for the bond. Each should be auto.
+ self.assertIn("auto oeth0", eni_rendering)
+ self.assertIn("auto oeth1", eni_rendering)
+ self.assertIn("auto bond0", eni_rendering)
+
+ def test_vlan(self):
+ # light testing of vlan config conversion and eni rendering
+ ncfg = openstack.convert_net_json(NETWORK_DATA_VLAN,
+ known_macs=KNOWN_MACS)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+
+ self.assertIn("iface enp0s1", eni_rendering)
+ self.assertIn("address 10.0.1.5", eni_rendering)
+ self.assertIn("auto enp0s1.602", eni_rendering)
+
def cfg_ds_from_dir(seed_d):
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index 8936a1e3..f5d2ef35 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -15,68 +15,58 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-
-from six.moves.urllib_parse import urlparse
+import json
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceDigitalOcean
from .. import helpers as test_helpers
+from ..helpers import HttprettyTestCase
httpretty = test_helpers.import_httpretty()
-# Abbreviated for the test
-DO_INDEX = """id
- hostname
- user-data
- vendor-data
- public-keys
- region"""
-
-DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com
- ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com"""
-DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com"
+DO_MULTIPLE_KEYS = ["ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@do.co",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@do.co"]
+DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... test@do.co"
DO_META = {
- '': DO_INDEX,
- 'user-data': '#!/bin/bash\necho "user-data"',
- 'vendor-data': '#!/bin/bash\necho "vendor-data"',
- 'public-keys': DO_SINGLE_KEY,
+ 'user_data': 'user_data_here',
+ 'vendor_data': 'vendor_data_here',
+ 'public_keys': DO_SINGLE_KEY,
'region': 'nyc3',
'id': '2000000',
'hostname': 'cloudinit-test',
}
-MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*')
+MD_URL = 'http://169.254.169.254/metadata/v1.json'
+
+
+def _mock_dmi():
+ return (True, DO_META.get('id'))
def _request_callback(method, uri, headers):
- url_path = urlparse(uri).path
- if url_path.startswith('/metadata/v1/'):
- path = url_path.split('/metadata/v1/')[1:][0]
- else:
- path = None
- if path in DO_META:
- return (200, headers, DO_META.get(path))
- else:
- return (404, headers, '')
+ return (200, headers, json.dumps(DO_META))
-class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
+class TestDataSourceDigitalOcean(HttprettyTestCase):
+ """
+ Test reading the meta-data
+ """
def setUp(self):
self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
settings.CFG_BUILTIN, None,
helpers.Paths({}))
+ self.ds._get_sysinfo = _mock_dmi
super(TestDataSourceDigitalOcean, self).setUp()
@httpretty.activate
def test_connection(self):
httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_request_callback)
+ httpretty.GET, MD_URL,
+ body=json.dumps(DO_META))
success = self.ds.get_data()
self.assertTrue(success)
@@ -84,14 +74,14 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
@httpretty.activate
def test_metadata(self):
httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
+ httpretty.GET, MD_URL,
body=_request_callback)
self.ds.get_data()
- self.assertEqual(DO_META.get('user-data'),
+ self.assertEqual(DO_META.get('user_data'),
self.ds.get_userdata_raw())
- self.assertEqual(DO_META.get('vendor-data'),
+ self.assertEqual(DO_META.get('vendor_data'),
self.ds.get_vendordata_raw())
self.assertEqual(DO_META.get('region'),
@@ -103,11 +93,8 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual(DO_META.get('hostname'),
self.ds.get_hostname())
- self.assertEqual('http://mirrors.digitalocean.com/',
- self.ds.get_package_mirror_info())
-
# Single key
- self.assertEqual([DO_META.get('public-keys')],
+ self.assertEqual([DO_META.get('public_keys')],
self.ds.get_public_ssh_keys())
self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
@@ -116,12 +103,12 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
def test_multiple_ssh_keys(self):
DO_META['public_keys'] = DO_MULTIPLE_KEYS
httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
+ httpretty.GET, MD_URL,
body=_request_callback)
self.ds.get_data()
# Multiple keys
- self.assertEqual(DO_META.get('public-keys').splitlines(),
+ self.assertEqual(DO_META.get('public_keys'),
self.ds.get_public_ssh_keys())
self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index f66f1c6d..0126c883 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -2,6 +2,7 @@ from copy import copy
import os
import shutil
import tempfile
+import yaml
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
@@ -24,41 +25,44 @@ class TestMAASDataSource(TestCase):
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
- data = {'instance-id': 'i-valid01',
- 'local-hostname': 'valid01-hostname',
- 'user-data': b'valid01-userdata',
+ userdata = b'valid01-userdata'
+ data = {'meta-data/instance-id': 'i-valid01',
+ 'meta-data/local-hostname': 'valid01-hostname',
+ 'user-data': userdata,
'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
my_d = os.path.join(self.tmp, "valid")
populate_dir(my_d, data)
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
+ ud, md, vd = DataSourceMAAS.read_maas_seed_dir(my_d)
- self.assertEqual(userdata, data['user-data'])
+ self.assertEqual(userdata, ud)
for key in ('instance-id', 'local-hostname'):
- self.assertEqual(data[key], metadata[key])
+ self.assertEqual(data["meta-data/" + key], md[key])
# verify that 'userdata' is not returned as part of the metadata
- self.assertFalse(('user-data' in metadata))
+ self.assertFalse(('user-data' in md))
+ self.assertEqual(vd, None)
def test_seed_dir_valid_extra(self):
"""Verify extra files do not affect seed_dir validity."""
- data = {'instance-id': 'i-valid-extra',
- 'local-hostname': 'valid-extra-hostname',
- 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
+ userdata = b'valid-extra-userdata'
+ data = {'meta-data/instance-id': 'i-valid-extra',
+ 'meta-data/local-hostname': 'valid-extra-hostname',
+ 'user-data': userdata, 'foo': 'bar'}
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
+ ud, md, vd = DataSourceMAAS.read_maas_seed_dir(my_d)
- self.assertEqual(userdata, data['user-data'])
+ self.assertEqual(userdata, ud)
for key in ('instance-id', 'local-hostname'):
- self.assertEqual(data[key], metadata[key])
+ self.assertEqual(data['meta-data/' + key], md[key])
# additional files should not just appear as keys in metadata atm
- self.assertFalse(('foo' in metadata))
+ self.assertFalse(('foo' in md))
def test_seed_dir_invalid(self):
"""Verify that invalid seed_dir raises MAASSeedDirMalformed."""
@@ -97,67 +101,60 @@ class TestMAASDataSource(TestCase):
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
+ def mock_read_maas_seed_url(self, data, seed, version="19991231"):
+ """mock up readurl to appear as a web server at seed has provided data.
+ return what read_maas_seed_url returns."""
+ def my_readurl(*args, **kwargs):
+ if len(args):
+ url = args[0]
+ else:
+ url = kwargs['url']
+ prefix = "%s/%s/" % (seed, version)
+ if not url.startswith(prefix):
+ raise ValueError("unexpected call %s" % url)
+
+ short = url[len(prefix):]
+ if short not in data:
+ raise url_helper.UrlError("not found", code=404, url=url)
+ return url_helper.StringResponse(data[short])
+
+ # Now do the actual call of the code under test.
+ with mock.patch("cloudinit.url_helper.readurl") as mock_readurl:
+ mock_readurl.side_effect = my_readurl
+ return DataSourceMAAS.read_maas_seed_url(seed, version=version)
+
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
valid = {
'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
+ 'meta-data/vendor-data': b'my-vendordata',
'user-data': b'foodata',
}
- valid_order = [
- 'meta-data/local-hostname',
- 'meta-data/instance-id',
- 'meta-data/public-keys',
- 'user-data',
- ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
- my_headers = {'header1': 'value1', 'header2': 'value2'}
-
- def my_headers_cb(url):
- return my_headers
-
- # Each time url_helper.readurl() is called, something different is
- # returned based on the canned data above. We need to build up a list
- # of side effect return values, which the mock will return. At the
- # same time, we'll build up a list of expected call arguments for
- # asserting after the code under test is run.
- calls = []
-
- def side_effect():
- for key in valid_order:
- resp = valid.get(key)
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- calls.append(
- mock.call(url, headers=None, timeout=mock.ANY,
- data=mock.ANY, sec_between=mock.ANY,
- ssl_details=mock.ANY, retries=mock.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mock.ANY))
- yield url_helper.StringResponse(resp)
-
- # Now do the actual call of the code under test.
- with mock.patch.object(url_helper, 'readurl',
- side_effect=side_effect()) as mockobj:
- userdata, metadata = DataSourceMAAS.read_maas_seed_url(
- my_seed, version=my_ver)
-
- self.assertEqual(b"foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
-
- mockobj.has_calls(calls)
-
- def test_seed_url_invalid(self):
- """Verify that invalid seed_url raises MAASSeedDirMalformed."""
- pass
-
- def test_seed_url_missing(self):
- """Verify seed_url with no found entries raises MAASSeedDirNone."""
- pass
+ ud, md, vd = self.mock_read_maas_seed_url(valid, my_seed, my_ver)
+
+ self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
+ self.assertEqual(
+ valid['meta-data/local-hostname'], md['local-hostname'])
+ self.assertEqual(valid['meta-data/public-keys'], md['public-keys'])
+ self.assertEqual(valid['user-data'], ud)
+ # vendor-data is yaml, which decodes a string
+ self.assertEqual(valid['meta-data/vendor-data'].decode(), vd)
+
+ def test_seed_url_vendor_data_dict(self):
+ expected_vd = {'key1': 'value1'}
+ valid = {
+ 'meta-data/instance-id': 'i-instanceid',
+ 'meta-data/local-hostname': 'test-hostname',
+ 'meta-data/vendor-data': yaml.safe_dump(expected_vd).encode(),
+ }
+ ud, md, vd = self.mock_read_maas_seed_url(
+ valid, "http://example.com/foo")
+ self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
+ self.assertEqual(expected_vd, vd)
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index b0fa1130..f6a46ce9 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -6,7 +6,7 @@ from ..helpers import TestCase, populate_dir, mock, ExitStack
import os
import shutil
import tempfile
-
+import textwrap
import yaml
@@ -129,6 +129,89 @@ class TestNoCloudDataSource(TestCase):
self.assertFalse(dsrc.vendordata)
self.assertTrue(ret)
+ def test_metadata_network_interfaces(self):
+ gateway = "103.225.10.1"
+ md = {
+ 'instance-id': 'i-abcd',
+ 'local-hostname': 'hostname1',
+ 'network-interfaces': textwrap.dedent("""\
+ auto eth0
+ iface eth0 inet static
+ hwaddr 00:16:3e:70:e1:04
+ address 103.225.10.12
+ netmask 255.255.255.0
+ gateway """ + gateway + """
+ dns-servers 8.8.8.8""")}
+
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': b"ud",
+ 'meta-data': yaml.dump(md) + "\n"})
+
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ # very simple check just for the strings above
+ self.assertIn(gateway, str(dsrc.network_config))
+
+ def test_metadata_network_config(self):
+ # network-config needs to get into network_config
+ netconf = {'version': 1,
+ 'config': [{'type': 'physical', 'name': 'interface0',
+ 'subnets': [{'type': 'dhcp'}]}]}
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': b"ud",
+ 'meta-data': "instance-id: IID\n",
+ 'network-config': yaml.dump(netconf) + "\n"})
+
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
+
+ def test_metadata_network_config_over_interfaces(self):
+ # network-config should override meta-data/network-interfaces
+ gateway = "103.225.10.1"
+ md = {
+ 'instance-id': 'i-abcd',
+ 'local-hostname': 'hostname1',
+ 'network-interfaces': textwrap.dedent("""\
+ auto eth0
+ iface eth0 inet static
+ hwaddr 00:16:3e:70:e1:04
+ address 103.225.10.12
+ netmask 255.255.255.0
+ gateway """ + gateway + """
+ dns-servers 8.8.8.8""")}
+
+ netconf = {'version': 1,
+ 'config': [{'type': 'physical', 'name': 'interface0',
+ 'subnets': [{'type': 'dhcp'}]}]}
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': b"ud",
+ 'meta-data': yaml.dump(md) + "\n",
+ 'network-config': yaml.dump(netconf) + "\n"})
+
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
+ self.assertNotIn(gateway, str(dsrc.network_config))
+
class TestParseCommandLineData(TestCase):
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 5c8592c5..97b99a18 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -27,6 +27,7 @@ from six import StringIO
from cloudinit import helpers
from cloudinit import settings
+from cloudinit.sources import convert_vendordata
from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
@@ -318,7 +319,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
class TestVendorDataLoading(test_helpers.TestCase):
def cvj(self, data):
- return openstack.convert_vendordata_json(data)
+ return convert_vendordata(data)
def test_vd_load_none(self):
# non-existant vendor-data should return none
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 9c6c8768..0532f986 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -36,6 +36,8 @@ import uuid
from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.sources.DataSourceSmartOS import (
+ convert_smartos_network_data as convert_net)
import six
@@ -86,6 +88,229 @@ SDC_NICS = json.loads("""
]
""")
+
+SDC_NICS_ALT = json.loads("""
+[
+ {
+ "interface": "net0",
+ "mac": "90:b8:d0:ae:64:51",
+ "vlan_id": 324,
+ "nic_tag": "external",
+ "gateway": "8.12.42.1",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.51",
+ "ips": [
+ "8.12.42.51/24"
+ ],
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model": "virtio",
+ "mtu": 1500,
+ "primary": true
+ },
+ {
+ "interface": "net1",
+ "mac": "90:b8:d0:bd:4f:9c",
+ "vlan_id": 600,
+ "nic_tag": "internal",
+ "netmask": "255.255.255.0",
+ "ip": "10.210.1.217",
+ "ips": [
+ "10.210.1.217/24"
+ ],
+ "network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model": "virtio",
+ "mtu": 1500
+ }
+]
+""")
+
+SDC_NICS_DHCP = json.loads("""
+[
+ {
+ "interface": "net0",
+ "mac": "90:b8:d0:ae:64:51",
+ "vlan_id": 324,
+ "nic_tag": "external",
+ "gateway": "8.12.42.1",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.51",
+ "ips": [
+ "8.12.42.51/24"
+ ],
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model": "virtio",
+ "mtu": 1500,
+ "primary": true
+ },
+ {
+ "interface": "net1",
+ "mac": "90:b8:d0:bd:4f:9c",
+ "vlan_id": 600,
+ "nic_tag": "internal",
+ "netmask": "255.255.255.0",
+ "ip": "10.210.1.217",
+ "ips": [
+ "dhcp"
+ ],
+ "network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model": "virtio",
+ "mtu": 1500
+ }
+]
+""")
+
+SDC_NICS_MIP = json.loads("""
+[
+ {
+ "interface": "net0",
+ "mac": "90:b8:d0:ae:64:51",
+ "vlan_id": 324,
+ "nic_tag": "external",
+ "gateway": "8.12.42.1",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.51",
+ "ips": [
+ "8.12.42.51/24",
+ "8.12.42.52/24"
+ ],
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model": "virtio",
+ "mtu": 1500,
+ "primary": true
+ },
+ {
+ "interface": "net1",
+ "mac": "90:b8:d0:bd:4f:9c",
+ "vlan_id": 600,
+ "nic_tag": "internal",
+ "netmask": "255.255.255.0",
+ "ip": "10.210.1.217",
+ "ips": [
+ "10.210.1.217/24",
+ "10.210.1.151/24"
+ ],
+ "network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model": "virtio",
+ "mtu": 1500
+ }
+]
+""")
+
+SDC_NICS_MIP_IPV6 = json.loads("""
+[
+ {
+ "interface": "net0",
+ "mac": "90:b8:d0:ae:64:51",
+ "vlan_id": 324,
+ "nic_tag": "external",
+ "gateway": "8.12.42.1",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.51",
+ "ips": [
+ "2001:4800:78ff:1b:be76:4eff:fe06:96b3/64",
+ "8.12.42.51/24"
+ ],
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model": "virtio",
+ "mtu": 1500,
+ "primary": true
+ },
+ {
+ "interface": "net1",
+ "mac": "90:b8:d0:bd:4f:9c",
+ "vlan_id": 600,
+ "nic_tag": "internal",
+ "netmask": "255.255.255.0",
+ "ip": "10.210.1.217",
+ "ips": [
+ "10.210.1.217/24"
+ ],
+ "network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model": "virtio",
+ "mtu": 1500
+ }
+]
+""")
+
+SDC_NICS_IPV4_IPV6 = json.loads("""
+[
+ {
+ "interface": "net0",
+ "mac": "90:b8:d0:ae:64:51",
+ "vlan_id": 324,
+ "nic_tag": "external",
+ "gateway": "8.12.42.1",
+ "gateways": ["8.12.42.1", "2001::1", "2001::2"],
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.51",
+ "ips": ["2001::10/64", "8.12.42.51/24", "2001::11/64",
+ "8.12.42.52/32"],
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model": "virtio",
+ "mtu": 1500,
+ "primary": true
+ },
+ {
+ "interface": "net1",
+ "mac": "90:b8:d0:bd:4f:9c",
+ "vlan_id": 600,
+ "nic_tag": "internal",
+ "netmask": "255.255.255.0",
+ "ip": "10.210.1.217",
+ "ips": ["10.210.1.217/24"],
+ "gateways": ["10.210.1.210"],
+ "network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model": "virtio",
+ "mtu": 1500
+ }
+]
+""")
+
+SDC_NICS_SINGLE_GATEWAY = json.loads("""
+[
+ {
+ "interface":"net0",
+ "mac":"90:b8:d0:d8:82:b4",
+ "vlan_id":324,
+ "nic_tag":"external",
+ "gateway":"8.12.42.1",
+ "gateways":["8.12.42.1"],
+ "netmask":"255.255.255.0",
+ "ip":"8.12.42.26",
+ "ips":["8.12.42.26/24"],
+ "network_uuid":"992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "model":"virtio",
+ "mtu":1500,
+ "primary":true
+ },
+ {
+ "interface":"net1",
+ "mac":"90:b8:d0:0a:51:31",
+ "vlan_id":600,
+ "nic_tag":"internal",
+ "netmask":"255.255.255.0",
+ "ip":"10.210.1.27",
+ "ips":["10.210.1.27/24"],
+ "network_uuid":"98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
+ "model":"virtio",
+ "mtu":1500
+ }
+]
+""")
+
+
MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
@@ -524,20 +749,135 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
class TestNetworkConversion(TestCase):
-
def test_convert_simple(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'netmask': '255.255.255.0',
'address': '8.12.42.102/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
{'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '192.168.128.1',
- 'netmask': '255.255.252.0',
+ 'subnets': [{'type': 'static',
'address': '192.168.128.93/22'}],
'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
- found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS)
+ found = convert_net(SDC_NICS)
+ self.assertEqual(expected, found)
+
+ def test_convert_simple_alt(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'address': '8.12.42.51/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static',
+ 'address': '10.210.1.217/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ found = convert_net(SDC_NICS_ALT)
+ self.assertEqual(expected, found)
+
+ def test_convert_simple_dhcp(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'address': '8.12.42.51/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'dhcp4'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ found = convert_net(SDC_NICS_DHCP)
+ self.assertEqual(expected, found)
+
+ def test_convert_simple_multi_ip(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'address': '8.12.42.51/24'},
+ {'type': 'static',
+ 'address': '8.12.42.52/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static',
+ 'address': '10.210.1.217/24'},
+ {'type': 'static',
+ 'address': '10.210.1.151/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ found = convert_net(SDC_NICS_MIP)
+ self.assertEqual(expected, found)
+
+ def test_convert_with_dns(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'address': '8.12.42.51/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'dhcp4'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'},
+ {'type': 'nameserver',
+ 'address': ['8.8.8.8', '8.8.8.1'], 'search': ["local"]}]}
+ found = convert_net(
+ network_data=SDC_NICS_DHCP, dns_servers=['8.8.8.8', '8.8.8.1'],
+ dns_domain="local")
+ self.assertEqual(expected, found)
+
+ def test_convert_simple_multi_ipv6(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'address':
+ '2001:4800:78ff:1b:be76:4eff:fe06:96b3/64'},
+ {'type': 'static', 'gateway': '8.12.42.1',
+ 'address': '8.12.42.51/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static',
+ 'address': '10.210.1.217/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ found = convert_net(SDC_NICS_MIP_IPV6)
+ self.assertEqual(expected, found)
+
+ def test_convert_simple_both_ipv4_ipv6(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'mac_address': '90:b8:d0:ae:64:51', 'mtu': 1500,
+ 'name': 'net0', 'type': 'physical',
+ 'subnets': [{'address': '2001::10/64', 'gateway': '2001::1',
+ 'type': 'static'},
+ {'address': '8.12.42.51/24',
+ 'gateway': '8.12.42.1',
+ 'type': 'static'},
+ {'address': '2001::11/64', 'type': 'static'},
+ {'address': '8.12.42.52/32', 'type': 'static'}]},
+ {'mac_address': '90:b8:d0:bd:4f:9c', 'mtu': 1500,
+ 'name': 'net1', 'type': 'physical',
+ 'subnets': [{'address': '10.210.1.217/24',
+ 'type': 'static'}]}]}
+ found = convert_net(SDC_NICS_IPV4_IPV6)
+ self.assertEqual(expected, found)
+
+ def test_gateways_not_on_all_nics(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500,
+ 'name': 'net0', 'type': 'physical',
+ 'subnets': [{'address': '8.12.42.26/24',
+ 'gateway': '8.12.42.1', 'type': 'static'}]},
+ {'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500,
+ 'name': 'net1', 'type': 'physical',
+ 'subnets': [{'address': '10.210.1.27/24',
+ 'type': 'static'}]}]}
+ found = convert_net(SDC_NICS_SINGLE_GATEWAY)
self.assertEqual(expected, found)
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index 96fa0811..24ad115f 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -226,8 +226,5 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
os.symlink('/', '/run/systemd/system')
self.assertFalse(d.uses_systemd())
-# def _get_package_mirror_info(mirror_info, availability_zone=None,
-# mirror_filter=util.search_for_mirror):
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
index d1dca2c4..45714efd 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
@@ -3,6 +3,7 @@ from cloudinit import util
from ..helpers import TestCase
+import copy
import os
import re
import shutil
@@ -27,7 +28,7 @@ class TestAptProxyConfig(TestCase):
contents, flags=re.IGNORECASE)
def test_apt_proxy_written(self):
- cfg = {'apt_proxy': 'myproxy'}
+ cfg = {'proxy': 'myproxy'}
cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
@@ -37,7 +38,7 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_http_proxy_written(self):
- cfg = {'apt_http_proxy': 'myproxy'}
+ cfg = {'http_proxy': 'myproxy'}
cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
@@ -47,13 +48,13 @@ class TestAptProxyConfig(TestCase):
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_all_proxy_written(self):
- cfg = {'apt_http_proxy': 'myproxy_http_proxy',
- 'apt_https_proxy': 'myproxy_https_proxy',
- 'apt_ftp_proxy': 'myproxy_ftp_proxy'}
+ cfg = {'http_proxy': 'myproxy_http_proxy',
+ 'https_proxy': 'myproxy_https_proxy',
+ 'ftp_proxy': 'myproxy_ftp_proxy'}
- values = {'http': cfg['apt_http_proxy'],
- 'https': cfg['apt_https_proxy'],
- 'ftp': cfg['apt_ftp_proxy'],
+ values = {'http': cfg['http_proxy'],
+ 'https': cfg['https_proxy'],
+ 'ftp': cfg['ftp_proxy'],
}
cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
@@ -74,7 +75,7 @@ class TestAptProxyConfig(TestCase):
def test_proxy_replaced(self):
util.write_file(self.cfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
+ cc_apt_configure.apply_apt_config({'proxy': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
contents = load_tfile_or_url(self.pfile)
@@ -82,7 +83,7 @@ class TestAptProxyConfig(TestCase):
def test_config_written(self):
payload = 'this is my apt config'
- cfg = {'apt_config': payload}
+ cfg = {'conf': payload}
cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
@@ -93,17 +94,38 @@ class TestAptProxyConfig(TestCase):
def test_config_replaced(self):
util.write_file(self.pfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({'apt_config': "foo"},
+ cc_apt_configure.apply_apt_config({'conf': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.cfile))
self.assertEqual(load_tfile_or_url(self.cfile), "foo")
def test_config_deleted(self):
- # if no 'apt_config' is provided, delete any previously written file
+ # if no 'conf' is provided, delete any previously written file
util.write_file(self.pfile, "content doesnt matter")
cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile)
self.assertFalse(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
+class TestConversion(TestCase):
+ def test_convert_with_apt_mirror_as_empty_string(self):
+ # an empty apt_mirror is the same as no apt_mirror
+ empty_m_found = cc_apt_configure.convert_to_v3_apt_format(
+ {'apt_mirror': ''})
+ default_found = cc_apt_configure.convert_to_v3_apt_format({})
+ self.assertEqual(default_found, empty_m_found)
+
+ def test_convert_with_apt_mirror(self):
+ mirror = 'http://my.mirror/ubuntu'
+ f = cc_apt_configure.convert_to_v3_apt_format({'apt_mirror': mirror})
+ self.assertIn(mirror, {m['uri'] for m in f['apt']['primary']})
+
+ def test_no_old_content(self):
+ mirror = 'http://my.mirror/ubuntu'
+ mydata = {'apt': {'primary': {'arches': ['default'], 'uri': mirror}}}
+ expected = copy.deepcopy(mydata)
+ self.assertEqual(expected,
+ cc_apt_configure.convert_to_v3_apt_format(mydata))
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py
index acde0863..f4411869 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py
@@ -79,6 +79,15 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
self.new_root = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.new_root)
+ rpatcher = mock.patch("cloudinit.util.lsb_release")
+ get_rel = rpatcher.start()
+ get_rel.return_value = {'codename': "fakerelease"}
+ self.addCleanup(rpatcher.stop)
+ apatcher = mock.patch("cloudinit.util.get_architecture")
+ get_arch = apatcher.start()
+ get_arch.return_value = 'amd64'
+ self.addCleanup(apatcher.stop)
+
def _get_cloud(self, distro, metadata=None):
self.patchUtils(self.new_root)
paths = helpers.Paths({})
@@ -102,25 +111,38 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
cfg = {'apt_mirror': mirror}
mycloud = self._get_cloud(distro)
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile',
- return_value=True) as mockisfile:
- with mock.patch.object(util, 'rename'):
- cc_apt_configure.handle("notimportant", cfg, mycloud,
- LOG, None)
+ with mock.patch.object(util, 'write_file') as mockwf:
+ with mock.patch.object(util, 'load_file',
+ return_value="faketmpl") as mocklf:
+ with mock.patch.object(os.path, 'isfile',
+ return_value=True) as mockisfile:
+ with mock.patch.object(templater, 'render_string',
+ return_value="fake") as mockrnd:
+ with mock.patch.object(util, 'rename'):
+ cc_apt_configure.handle("test", cfg, mycloud,
+ LOG, None)
mockisfile.assert_any_call(
('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/sources.list.%s.tmpl' % distro),
- '/etc/apt/sources.list',
- {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck})
-
- def test_apt_source_list_debian(self):
+ mocklf.assert_any_call(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
+ mockrnd.assert_called_once_with('faketmpl',
+ {'RELEASE': 'fakerelease',
+ 'PRIMARY': mirrorcheck,
+ 'MIRROR': mirrorcheck,
+ 'SECURITY': mirrorcheck,
+ 'codename': 'fakerelease',
+ 'primary': mirrorcheck,
+ 'mirror': mirrorcheck,
+ 'security': mirrorcheck})
+ mockwf.assert_called_once_with('/etc/apt/sources.list', 'fake',
+ mode=0o644)
+
+ def test_apt_v1_source_list_debian(self):
"""Test rendering of a source.list from template for debian"""
self.apt_source_list('debian', 'http://httpredir.debian.org/debian')
- def test_apt_source_list_ubuntu(self):
+ def test_apt_v1_source_list_ubuntu(self):
"""Test rendering of a source.list from template for ubuntu"""
self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/')
@@ -134,7 +156,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
print("Faking SUCCESS for '%s'" % name)
return True
- def test_apt_srcl_debian_mirrorfail(self):
+ def test_apt_v1_srcl_debian_mirrorfail(self):
"""Test rendering of a source.list from template for debian"""
with mock.patch.object(util, 'is_resolvable',
side_effect=self.myresolve) as mockresolve:
@@ -145,7 +167,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
mockresolve.assert_any_call("does.not.exist")
mockresolve.assert_any_call("httpredir.debian.org")
- def test_apt_srcl_ubuntu_mirrorfail(self):
+ def test_apt_v1_srcl_ubuntu_mirrorfail(self):
"""Test rendering of a source.list from template for ubuntu"""
with mock.patch.object(util, 'is_resolvable',
side_effect=self.myresolve) as mockresolve:
@@ -156,7 +178,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
mockresolve.assert_any_call("does.not.exist")
mockresolve.assert_any_call("archive.ubuntu.com")
- def test_apt_srcl_custom(self):
+ def test_apt_v1_srcl_custom(self):
"""Test rendering from a custom source.list template"""
cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL)
mycloud = self._get_cloud('ubuntu')
@@ -164,12 +186,10 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
# the second mock restores the original subp
with mock.patch.object(util, 'write_file') as mockwrite:
with mock.patch.object(util, 'subp', self.subp):
- with mock.patch.object(cc_apt_configure, 'get_release',
- return_value='fakerelease'):
- with mock.patch.object(Distro, 'get_primary_arch',
- return_value='amd64'):
- cc_apt_configure.handle("notimportant", cfg, mycloud,
- LOG, None)
+ with mock.patch.object(Distro, 'get_primary_arch',
+ return_value='amd64'):
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
mockwrite.assert_called_once_with(
'/etc/apt/sources.list',
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
new file mode 100644
index 00000000..e53b0450
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
@@ -0,0 +1,187 @@
+""" test_apt_custom_sources_list
+Test templating of custom sources list
+"""
+import logging
+import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+from mock import call
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.config import cc_apt_configure
+from cloudinit.sources import DataSourceNone
+
+from cloudinit.distros.debian import Distro
+
+from .. import helpers as t_help
+
+LOG = logging.getLogger(__name__)
+
+TARGET = "/"
+
+# Input and expected output for the custom template
+YAML_TEXT_CUSTOM_SL = """
+apt:
+ primary:
+ - arches: [default]
+ uri: http://test.ubuntu.com/ubuntu/
+ security:
+ - arches: [default]
+ uri: http://testsec.ubuntu.com/ubuntu/
+ sources_list: |
+
+ # Note, this file is written by cloud-init at install time. It should not
+ # end up on the installed system itself.
+ # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+ # newer versions of the distribution.
+ deb $MIRROR $RELEASE main restricted
+ deb-src $MIRROR $RELEASE main restricted
+ deb $PRIMARY $RELEASE universe restricted
+ deb $SECURITY $RELEASE-security multiverse
+ # FIND_SOMETHING_SPECIAL
+"""
+
+EXPECTED_CONVERTED_CONTENT = """
+# Note, this file is written by cloud-init at install time. It should not
+# end up on the installed system itself.
+# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+# newer versions of the distribution.
+deb http://test.ubuntu.com/ubuntu/ fakerel main restricted
+deb-src http://test.ubuntu.com/ubuntu/ fakerel main restricted
+deb http://test.ubuntu.com/ubuntu/ fakerel universe restricted
+deb http://testsec.ubuntu.com/ubuntu/ fakerel-security multiverse
+# FIND_SOMETHING_SPECIAL
+"""
+
+# mocked to be independent to the unittest system
+MOCKED_APT_SRC_LIST = """
+deb http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted
+deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted
+"""
+
+EXPECTED_BASE_CONTENT = ("""
+deb http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted
+deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted
+""")
+
+EXPECTED_MIRROR_CONTENT = ("""
+deb http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted
+deb http://test.ubuntu.com/ubuntu/ notouched-security main restricted
+""")
+
+EXPECTED_PRIMSEC_CONTENT = ("""
+deb http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted
+deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted
+deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted
+""")
+
+
+class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
+ """TestAptSourceConfigSourceList - Class to test sources list rendering"""
+ def setUp(self):
+ super(TestAptSourceConfigSourceList, self).setUp()
+ self.subp = util.subp
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+
+ rpatcher = mock.patch("cloudinit.util.lsb_release")
+ get_rel = rpatcher.start()
+ get_rel.return_value = {'codename': "fakerel"}
+ self.addCleanup(rpatcher.stop)
+ apatcher = mock.patch("cloudinit.util.get_architecture")
+ get_arch = apatcher.start()
+ get_arch.return_value = 'amd64'
+ self.addCleanup(apatcher.stop)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def _apt_source_list(self, cfg, expected, distro):
+ "_apt_source_list - Test rendering from template (generic)"
+
+ # entry at top level now, wrap in 'apt' key
+ cfg = {'apt': cfg}
+ mycloud = self._get_cloud(distro)
+ with mock.patch.object(util, 'write_file') as mockwf:
+ with mock.patch.object(util, 'load_file',
+ return_value=MOCKED_APT_SRC_LIST) as mocklf:
+ with mock.patch.object(os.path, 'isfile',
+ return_value=True) as mockisfile:
+ with mock.patch.object(util, 'rename'):
+ cc_apt_configure.handle("test", cfg, mycloud,
+ LOG, None)
+
+ # check if it would have loaded the distro template
+ mockisfile.assert_any_call(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
+ mocklf.assert_any_call(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
+ # check expected content in result
+ mockwf.assert_called_once_with('/etc/apt/sources.list', expected,
+ mode=0o644)
+
+ def test_apt_v3_source_list_debian(self):
+ """test_apt_v3_source_list_debian - without custom sources or parms"""
+ cfg = {}
+ self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'debian')
+
+ def test_apt_v3_source_list_ubuntu(self):
+ """test_apt_v3_source_list_ubuntu - without custom sources or parms"""
+ cfg = {}
+ self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'ubuntu')
+
+ def test_apt_v3_source_list_psm(self):
+ """test_apt_v3_source_list_psm - Test specifying prim+sec mirrors"""
+ pm = 'http://test.ubuntu.com/ubuntu/'
+ sm = 'http://testsec.ubuntu.com/ubuntu/'
+ cfg = {'preserve_sources_list': False,
+ 'primary': [{'arches': ["default"],
+ 'uri': pm}],
+ 'security': [{'arches': ["default"],
+ 'uri': sm}]}
+
+ self._apt_source_list(cfg, EXPECTED_PRIMSEC_CONTENT, 'ubuntu')
+
+ def test_apt_v3_srcl_custom(self):
+ """test_apt_v3_srcl_custom - Test rendering a custom source template"""
+ cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL)
+ mycloud = self._get_cloud('ubuntu')
+
+ # the second mock restores the original subp
+ with mock.patch.object(util, 'write_file') as mockwrite:
+ with mock.patch.object(util, 'subp', self.subp):
+ with mock.patch.object(Distro, 'get_primary_arch',
+ return_value='amd64'):
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
+
+ calls = [call('/etc/apt/sources.list',
+ EXPECTED_CONVERTED_CONTENT,
+ mode=0o644)]
+ mockwrite.assert_has_calls(calls)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py
index 99a4d860..ddff4341 100644
--- a/tests/unittests/test_handler/test_handler_apt_source.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py
@@ -1,5 +1,7 @@
-""" test_handler_apt_source
+""" test_handler_apt_source_v1
Testing various config variations of the apt_source config
+This calls all things with v1 format to stress the conversion code on top of
+the actually tested code.
"""
import os
import re
@@ -32,6 +34,8 @@ S0ORP6HXET3+jC8BMG4tBWCTK/XEZw==
=ACB2
-----END PGP PUBLIC KEY BLOCK-----"""
+ADD_APT_REPO_MATCH = r"^[\w-]+:\w"
+
def load_tfile_or_url(*args, **kwargs):
"""load_tfile_or_url
@@ -40,6 +44,19 @@ def load_tfile_or_url(*args, **kwargs):
return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+class FakeDistro(object):
+ """Fake Distro helper object"""
+ def update_package_sources(self):
+ """Fake update_package_sources helper method"""
+ return
+
+
+class FakeCloud(object):
+ """Fake Cloud helper object"""
+ def __init__(self):
+ self.distro = FakeDistro()
+
+
class TestAptSourceConfig(TestCase):
"""TestAptSourceConfig
Main Class to test apt_source configs
@@ -54,25 +71,39 @@ class TestAptSourceConfig(TestCase):
self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list")
self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
self.join = os.path.join
+ self.matcher = re.compile(ADD_APT_REPO_MATCH).search
# mock fallback filename into writable tmp dir
self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/",
"cloud_config_sources.list")
- patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release")
- get_rel = patcher.start()
- get_rel.return_value = self.release
- self.addCleanup(patcher.stop)
+ self.fakecloud = FakeCloud()
+
+ rpatcher = mock.patch("cloudinit.util.lsb_release")
+ get_rel = rpatcher.start()
+ get_rel.return_value = {'codename': self.release}
+ self.addCleanup(rpatcher.stop)
+ apatcher = mock.patch("cloudinit.util.get_architecture")
+ get_arch = apatcher.start()
+ get_arch.return_value = 'amd64'
+ self.addCleanup(apatcher.stop)
- @staticmethod
- def _get_default_params():
+ def _get_default_params(self):
"""get_default_params
Get the most basic default mrror and release info to be used in tests
"""
params = {}
- params['RELEASE'] = cc_apt_configure.get_release()
+ params['RELEASE'] = self.release
params['MIRROR'] = "http://archive.ubuntu.com/ubuntu"
return params
+ def wrapv1conf(self, cfg):
+ params = self._get_default_params()
+ # old v1 list format under old keys, but callabe to main handler
+ # disable source.list rendering and set mirror to avoid other code
+ return {'apt_preserve_sources_list': True,
+ 'apt_mirror': params['MIRROR'],
+ 'apt_sources': cfg}
+
def myjoin(self, *args, **kwargs):
"""myjoin - redir into writable tmpdir"""
if (args[0] == "/etc/apt/sources.list.d/" and
@@ -86,9 +117,9 @@ class TestAptSourceConfig(TestCase):
"""apt_src_basic
Test Fix deb source string, has to overwrite mirror conf in params
"""
- params = self._get_default_params()
+ cfg = self.wrapv1conf(cfg)
- cc_apt_configure.add_apt_sources(cfg, params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
self.assertTrue(os.path.isfile(filename))
@@ -181,8 +212,9 @@ class TestAptSourceConfig(TestCase):
"""apt_src_replace
Test Autoreplacement of MIRROR and RELEASE in source specs
"""
+ cfg = self.wrapv1conf(cfg)
params = self._get_default_params()
- cc_apt_configure.add_apt_sources(cfg, params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
self.assertTrue(os.path.isfile(filename))
@@ -246,16 +278,18 @@ class TestAptSourceConfig(TestCase):
"""apt_src_keyid
Test specification of a source + keyid
"""
- params = self._get_default_params()
+ cfg = self.wrapv1conf(cfg)
with mock.patch.object(util, 'subp',
return_value=('fakekey 1234', '')) as mockobj:
- cc_apt_configure.add_apt_sources(cfg, params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
# check if it added the right ammount of keys
calls = []
for _ in range(keynum):
- calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234'))
+ calls.append(call(['apt-key', 'add', '-'],
+ data=b'fakekey 1234',
+ target=None))
mockobj.assert_has_calls(calls, any_order=True)
self.assertTrue(os.path.isfile(filename))
@@ -329,12 +363,13 @@ class TestAptSourceConfig(TestCase):
"""apt_src_key
Test specification of a source + key
"""
- params = self._get_default_params()
+ cfg = self.wrapv1conf([cfg])
with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
- mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321')
+ mockobj.assert_called_with(['apt-key', 'add', '-'],
+ data=b'fakekey 4321', target=None)
self.assertTrue(os.path.isfile(filename))
@@ -368,30 +403,31 @@ class TestAptSourceConfig(TestCase):
def test_apt_src_keyonly(self):
"""Test specifying key without source"""
- params = self._get_default_params()
cfg = {'key': "fakekey 4242",
'filename': self.aptlistfile}
+ cfg = self.wrapv1conf([cfg])
with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
- mockobj.assert_called_once_with(('apt-key', 'add', '-'),
- 'fakekey 4242')
+ mockobj.assert_called_once_with(['apt-key', 'add', '-'],
+ data=b'fakekey 4242', target=None)
# filename should be ignored on key only
self.assertFalse(os.path.isfile(self.aptlistfile))
def test_apt_src_keyidonly(self):
"""Test specification of a keyid without source"""
- params = self._get_default_params()
cfg = {'keyid': "03683F77",
'filename': self.aptlistfile}
+ cfg = self.wrapv1conf([cfg])
with mock.patch.object(util, 'subp',
return_value=('fakekey 1212', '')) as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
- mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212')
+ mockobj.assert_called_with(['apt-key', 'add', '-'],
+ data=b'fakekey 1212', target=None)
# filename should be ignored on key only
self.assertFalse(os.path.isfile(self.aptlistfile))
@@ -402,17 +438,18 @@ class TestAptSourceConfig(TestCase):
up to addition of the key (add_apt_key_raw mocked to keep the
environment as is)
"""
- params = self._get_default_params()
+ key = cfg['keyid']
+ keyserver = cfg.get('keyserver', 'keyserver.ubuntu.com')
+ cfg = self.wrapv1conf([cfg])
with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey:
- with mock.patch.object(gpg, 'get_key_by_id',
+ with mock.patch.object(gpg, 'getkeybyid',
return_value=expectedkey) as mockgetkey:
- cc_apt_configure.add_apt_sources([cfg], params)
+ cc_apt_configure.handle("test", cfg, self.fakecloud,
+ None, None)
- mockgetkey.assert_called_with(cfg['keyid'],
- cfg.get('keyserver',
- 'keyserver.ubuntu.com'))
- mockkey.assert_called_with(expectedkey)
+ mockgetkey.assert_called_with(key, keyserver)
+ mockkey.assert_called_with(expectedkey, None)
# filename should be ignored on key only
self.assertFalse(os.path.isfile(self.aptlistfile))
@@ -444,41 +481,38 @@ class TestAptSourceConfig(TestCase):
def test_apt_src_ppa(self):
"""Test adding a ppa"""
- params = self._get_default_params()
cfg = {'source': 'ppa:smoser/cloud-init-test',
'filename': self.aptlistfile}
-
- # default matcher needed for ppa
- matcher = re.compile(r'^[\w-]+:\w').search
+ cfg = self.wrapv1conf([cfg])
with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params,
- aa_repo_match=matcher)
+ cc_apt_configure.handle("test", cfg, self.fakecloud, None, None)
mockobj.assert_called_once_with(['add-apt-repository',
- 'ppa:smoser/cloud-init-test'])
+ 'ppa:smoser/cloud-init-test'],
+ target=None)
# adding ppa should ignore filename (uses add-apt-repository)
self.assertFalse(os.path.isfile(self.aptlistfile))
def test_apt_src_ppa_tri(self):
"""Test adding three ppa's"""
- params = self._get_default_params()
cfg1 = {'source': 'ppa:smoser/cloud-init-test',
'filename': self.aptlistfile}
cfg2 = {'source': 'ppa:smoser/cloud-init-test2',
'filename': self.aptlistfile2}
cfg3 = {'source': 'ppa:smoser/cloud-init-test3',
'filename': self.aptlistfile3}
-
- # default matcher needed for ppa
- matcher = re.compile(r'^[\w-]+:\w').search
+ cfg = self.wrapv1conf([cfg1, cfg2, cfg3])
with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params,
- aa_repo_match=matcher)
- calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']),
- call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']),
- call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])]
+ cc_apt_configure.handle("test", cfg, self.fakecloud,
+ None, None)
+ calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'],
+ target=None),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'],
+ target=None),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'],
+ target=None)]
mockobj.assert_has_calls(calls, any_order=True)
# adding ppa should ignore all filenames (uses add-apt-repository)
@@ -494,6 +528,7 @@ class TestAptSourceConfig(TestCase):
'filename': self.aptlistfile2}
cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
'filename': self.aptlistfile3}
+ cfg = {'apt_sources': [cfg1, cfg2, cfg3]}
checkcfg = {self.aptlistfile: {'filename': self.aptlistfile,
'source': 'deb $MIRROR $RELEASE '
'multiverse'},
@@ -503,14 +538,89 @@ class TestAptSourceConfig(TestCase):
'source': 'deb $MIRROR $RELEASE '
'universe'}}
- newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3])
- self.assertEqual(newcfg, checkcfg)
+ newcfg = cc_apt_configure.convert_to_v3_apt_format(cfg)
+ self.assertEqual(newcfg['apt']['sources'], checkcfg)
+
+ # convert again, should stay the same
+ newcfg2 = cc_apt_configure.convert_to_v3_apt_format(newcfg)
+ self.assertEqual(newcfg2['apt']['sources'], checkcfg)
- newcfg2 = cc_apt_configure.convert_to_new_format(newcfg)
- self.assertEqual(newcfg2, checkcfg)
+ # should work without raising an exception
+ cc_apt_configure.convert_to_v3_apt_format({})
+
+ with self.assertRaises(ValueError):
+ cc_apt_configure.convert_to_v3_apt_format({'apt_sources': 5})
+
+ def test_convert_to_new_format_collision(self):
+ """Test the conversion of old to new format with collisions
+ That matches e.g. the MAAS case specifying old and new config"""
+ cfg_1_and_3 = {'apt': {'proxy': 'http://192.168.122.1:8000/'},
+ 'apt_proxy': 'http://192.168.122.1:8000/'}
+ cfg_3_only = {'apt': {'proxy': 'http://192.168.122.1:8000/'}}
+ cfgconflict = {'apt': {'proxy': 'http://192.168.122.1:8000/'},
+ 'apt_proxy': 'ftp://192.168.122.1:8000/'}
+
+ # collision (equal)
+ newcfg = cc_apt_configure.convert_to_v3_apt_format(cfg_1_and_3)
+ self.assertEqual(newcfg, cfg_3_only)
+ # collision (equal, so ok to remove)
+ newcfg = cc_apt_configure.convert_to_v3_apt_format(cfg_3_only)
+ self.assertEqual(newcfg, cfg_3_only)
+ # collision (unequal)
+ with self.assertRaises(ValueError):
+ cc_apt_configure.convert_to_v3_apt_format(cfgconflict)
+ def test_convert_to_new_format_dict_collision(self):
+ cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
+ 'filename': self.aptlistfile3}
+ fullv3 = {self.aptlistfile: {'filename': self.aptlistfile,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'multiverse'},
+ self.aptlistfile2: {'filename': self.aptlistfile2,
+ 'source': 'deb $MIRROR $RELEASE main'},
+ self.aptlistfile3: {'filename': self.aptlistfile3,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'universe'}}
+ cfg_3_only = {'apt': {'sources': fullv3}}
+ cfg_1_and_3 = {'apt_sources': [cfg1, cfg2, cfg3]}
+ cfg_1_and_3.update(cfg_3_only)
+
+ # collision (equal, so ok to remove)
+ newcfg = cc_apt_configure.convert_to_v3_apt_format(cfg_1_and_3)
+ self.assertEqual(newcfg, cfg_3_only)
+ # no old spec (same result)
+ newcfg = cc_apt_configure.convert_to_v3_apt_format(cfg_3_only)
+ self.assertEqual(newcfg, cfg_3_only)
+
+ diff = {self.aptlistfile: {'filename': self.aptlistfile,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'DIFFERENTVERSE'},
+ self.aptlistfile2: {'filename': self.aptlistfile2,
+ 'source': 'deb $MIRROR $RELEASE main'},
+ self.aptlistfile3: {'filename': self.aptlistfile3,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'universe'}}
+ cfg_3_only = {'apt': {'sources': diff}}
+ cfg_1_and_3_different = {'apt_sources': [cfg1, cfg2, cfg3]}
+ cfg_1_and_3_different.update(cfg_3_only)
+
+ # collision (unequal by dict having a different entry)
+ with self.assertRaises(ValueError):
+ cc_apt_configure.convert_to_v3_apt_format(cfg_1_and_3_different)
+
+ missing = {self.aptlistfile: {'filename': self.aptlistfile,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'multiverse'}}
+ cfg_3_only = {'apt': {'sources': missing}}
+ cfg_1_and_3_missing = {'apt_sources': [cfg1, cfg2, cfg3]}
+ cfg_1_and_3_missing.update(cfg_3_only)
+ # collision (unequal by dict missing an entry)
with self.assertRaises(ValueError):
- cc_apt_configure.convert_to_new_format(5)
+ cc_apt_configure.convert_to_v3_apt_format(cfg_1_and_3_missing)
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py
new file mode 100644
index 00000000..b92a50d7
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py
@@ -0,0 +1,1104 @@
+"""test_handler_apt_source_v3
+Testing various config variations of the apt_source custom config
+This tries to call all in the new v3 format and cares about new features
+"""
+import glob
+import os
+import re
+import shutil
+import socket
+import tempfile
+
+from unittest import TestCase
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+from mock import call
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import gpg
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.config import cc_apt_configure
+from cloudinit.sources import DataSourceNone
+
+from .. import helpers as t_help
+
+EXPECTEDKEY = u"""-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ
+NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy
+8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0
+HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG
+CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29
+OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ
+FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm
+S0ORP6HXET3+jC8BMG4tBWCTK/XEZw==
+=ACB2
+-----END PGP PUBLIC KEY BLOCK-----"""
+
+ADD_APT_REPO_MATCH = r"^[\w-]+:\w"
+
+TARGET = None
+
+
+def load_tfile(*args, **kwargs):
+ """load_tfile_or_url
+ load file and return content after decoding
+ """
+ return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+
+
+class TestAptSourceConfig(t_help.FilesystemMockingTestCase):
+ """TestAptSourceConfig
+ Main Class to test apt configs
+ """
+ def setUp(self):
+ super(TestAptSourceConfig, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.aptlistfile = os.path.join(self.tmp, "single-deb.list")
+ self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list")
+ self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
+ self.join = os.path.join
+ self.matcher = re.compile(ADD_APT_REPO_MATCH).search
+
+ @staticmethod
+ def _add_apt_sources(*args, **kwargs):
+ with mock.patch.object(cc_apt_configure, 'update_packages'):
+ cc_apt_configure.add_apt_sources(*args, **kwargs)
+
+ @staticmethod
+ def _get_default_params():
+ """get_default_params
+ Get the most basic default mrror and release info to be used in tests
+ """
+ params = {}
+ params['RELEASE'] = util.lsb_release()['codename']
+ arch = 'amd64'
+ params['MIRROR'] = cc_apt_configure.\
+ get_default_mirrors(arch)["PRIMARY"]
+ return params
+
+ def _myjoin(self, *args, **kwargs):
+ """_myjoin - redir into writable tmpdir"""
+ if (args[0] == "/etc/apt/sources.list.d/" and
+ args[1] == "cloud_config_sources.list" and
+ len(args) == 2):
+ return self.join(self.tmp, args[0].lstrip("/"), args[1])
+ else:
+ return self.join(*args, **kwargs)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def _apt_src_basic(self, filename, cfg):
+ """_apt_src_basic
+ Test Fix deb source string, has to overwrite mirror conf in params
+ """
+ params = self._get_default_params()
+
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://test.ubuntu.com/ubuntu",
+ "karmic-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_basic(self):
+ """test_apt_v3_src_basic - Test fix deb source string"""
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://test.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')}}
+ self._apt_src_basic(self.aptlistfile, cfg)
+
+ def test_apt_v3_src_basic_tri(self):
+ """test_apt_v3_src_basic_tri - Test multiple fix deb source strings"""
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://test.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile2: {'source':
+ ('deb http://test.ubuntu.com/ubuntu'
+ ' precise-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile3: {'source':
+ ('deb http://test.ubuntu.com/ubuntu'
+ ' lucid-backports'
+ ' main universe multiverse restricted')}}
+ self._apt_src_basic(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ contents = load_tfile(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://test.ubuntu.com/ubuntu",
+ "precise-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://test.ubuntu.com/ubuntu",
+ "lucid-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def _apt_src_replacement(self, filename, cfg):
+ """apt_src_replace
+ Test Autoreplacement of MIRROR and RELEASE in source specs
+ """
+ params = self._get_default_params()
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_replace(self):
+ """test_apt_v3_src_replace - Test replacement of MIRROR & RELEASE"""
+ cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}}
+ self._apt_src_replacement(self.aptlistfile, cfg)
+
+ def test_apt_v3_src_replace_fn(self):
+ """test_apt_v3_src_replace_fn - Test filename overwritten in dict"""
+ cfg = {'ignored': {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}}
+ # second file should overwrite the dict key
+ self._apt_src_replacement(self.aptlistfile, cfg)
+
+ def _apt_src_replace_tri(self, cfg):
+ """_apt_src_replace_tri
+ Test three autoreplacements of MIRROR and RELEASE in source specs with
+ generic part
+ """
+ self._apt_src_replacement(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ params = self._get_default_params()
+ contents = load_tfile(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "main"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "universe"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_replace_tri(self):
+ """test_apt_v3_src_replace_tri - Test multiple replace/overwrites"""
+ cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'},
+ 'notused': {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2},
+ self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}}
+ self._apt_src_replace_tri(cfg)
+
+ def _apt_src_keyid(self, filename, cfg, keynum):
+ """_apt_src_keyid
+ Test specification of a source + keyid
+ """
+ params = self._get_default_params()
+
+ with mock.patch("cloudinit.util.subp",
+ return_value=('fakekey 1234', '')) as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ # check if it added the right ammount of keys
+ calls = []
+ for _ in range(keynum):
+ calls.append(call(['apt-key', 'add', '-'], data=b'fakekey 1234',
+ target=TARGET))
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_keyid(self):
+ """test_apt_v3_src_keyid - Test source + keyid with filename"""
+ cfg = {self.aptlistfile: {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77"}}
+ self._apt_src_keyid(self.aptlistfile, cfg, 1)
+
+ def test_apt_v3_src_keyid_tri(self):
+ """test_apt_v3_src_keyid_tri - Test multiple src+key+filen writes"""
+ cfg = {self.aptlistfile: {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77"},
+ 'ignored': {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial universe'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile2},
+ self.aptlistfile3: {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial multiverse'),
+ 'keyid': "03683F77"}}
+
+ self._apt_src_keyid(self.aptlistfile, cfg, 3)
+ contents = load_tfile(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "universe"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_key(self):
+ """test_apt_v3_src_key - Test source + key"""
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'key': "fakekey 4321"}}
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4321',
+ target=TARGET)
+
+ self.assertTrue(os.path.isfile(self.aptlistfile))
+
+ contents = load_tfile(self.aptlistfile)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_v3_src_keyonly(self):
+ """test_apt_v3_src_keyonly - Test key without source"""
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'key': "fakekey 4242"}}
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4242',
+ target=TARGET)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_v3_src_keyidonly(self):
+ """test_apt_v3_src_keyidonly - Test keyid without source"""
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'keyid': "03683F77"}}
+
+ with mock.patch.object(util, 'subp',
+ return_value=('fakekey 1212', '')) as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 1212',
+ target=TARGET)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def apt_src_keyid_real(self, cfg, expectedkey):
+ """apt_src_keyid_real
+ Test specification of a keyid without source including
+ up to addition of the key (add_apt_key_raw mocked to keep the
+ environment as is)
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey:
+ with mock.patch.object(gpg, 'getkeybyid',
+ return_value=expectedkey) as mockgetkey:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ keycfg = cfg[self.aptlistfile]
+ mockgetkey.assert_called_with(keycfg['keyid'],
+ keycfg.get('keyserver',
+ 'keyserver.ubuntu.com'))
+ mockkey.assert_called_with(expectedkey, TARGET)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_v3_src_keyid_real(self):
+ """test_apt_v3_src_keyid_real - Test keyid including key add"""
+ keyid = "03683F77"
+ cfg = {self.aptlistfile: {'keyid': keyid}}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_v3_src_longkeyid_real(self):
+ """test_apt_v3_src_longkeyid_real Test long keyid including key add"""
+ keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
+ cfg = {self.aptlistfile: {'keyid': keyid}}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_v3_src_longkeyid_ks_real(self):
+ """test_apt_v3_src_longkeyid_ks_real Test long keyid from other ks"""
+ keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
+ cfg = {self.aptlistfile: {'keyid': keyid,
+ 'keyserver': 'keys.gnupg.net'}}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_v3_src_keyid_keyserver(self):
+ """test_apt_v3_src_keyid_keyserver - Test custom keyserver"""
+ keyid = "03683F77"
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'keyid': keyid,
+ 'keyserver': 'test.random.com'}}
+
+ # in some test environments only *.ubuntu.com is reachable
+ # so mock the call and check if the config got there
+ with mock.patch.object(gpg, 'getkeybyid',
+ return_value="fakekey") as mockgetkey:
+ with mock.patch.object(cc_apt_configure,
+ 'add_apt_key_raw') as mockadd:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+
+ mockgetkey.assert_called_with('03683F77', 'test.random.com')
+ mockadd.assert_called_with('fakekey', TARGET)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_v3_src_ppa(self):
+ """test_apt_v3_src_ppa - Test specification of a ppa"""
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'}}
+
+ with mock.patch("cloudinit.util.subp") as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+ mockobj.assert_any_call(['add-apt-repository',
+ 'ppa:smoser/cloud-init-test'], target=TARGET)
+
+ # adding ppa should ignore filename (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_v3_src_ppa_tri(self):
+ """test_apt_v3_src_ppa_tri - Test specification of multiple ppa's"""
+ params = self._get_default_params()
+ cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'},
+ self.aptlistfile2: {'source': 'ppa:smoser/cloud-init-test2'},
+ self.aptlistfile3: {'source': 'ppa:smoser/cloud-init-test3'}}
+
+ with mock.patch("cloudinit.util.subp") as mockobj:
+ self._add_apt_sources(cfg, TARGET, template_params=params,
+ aa_repo_match=self.matcher)
+ calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'],
+ target=TARGET),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'],
+ target=TARGET),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'],
+ target=TARGET)]
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ # adding ppa should ignore all filenames (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+ self.assertFalse(os.path.isfile(self.aptlistfile2))
+ self.assertFalse(os.path.isfile(self.aptlistfile3))
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture")
+ def test_apt_v3_list_rename(self, m_get_architecture):
+ """test_apt_v3_list_rename - Test find mirror and apt list renaming"""
+ pre = "/var/lib/apt/lists"
+ # filenames are archive dependent
+
+ arch = 's390x'
+ m_get_architecture.return_value = arch
+ component = "ubuntu-ports"
+ archive = "ports.ubuntu.com"
+
+ cfg = {'primary': [{'arches': ["default"],
+ 'uri':
+ 'http://test.ubuntu.com/%s/' % component}],
+ 'security': [{'arches': ["default"],
+ 'uri':
+ 'http://testsec.ubuntu.com/%s/' % component}]}
+ post = ("%s_dists_%s-updates_InRelease" %
+ (component, util.lsb_release()['codename']))
+ fromfn = ("%s/%s_%s" % (pre, archive, post))
+ tofn = ("%s/test.ubuntu.com_%s" % (pre, post))
+
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch)
+
+ self.assertEqual(mirrors['MIRROR'],
+ "http://test.ubuntu.com/%s/" % component)
+ self.assertEqual(mirrors['PRIMARY'],
+ "http://test.ubuntu.com/%s/" % component)
+ self.assertEqual(mirrors['SECURITY'],
+ "http://testsec.ubuntu.com/%s/" % component)
+
+ with mock.patch.object(os, 'rename') as mockren:
+ with mock.patch.object(glob, 'glob',
+ return_value=[fromfn]):
+ cc_apt_configure.rename_apt_lists(mirrors, TARGET)
+
+ mockren.assert_any_call(fromfn, tofn)
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture")
+ def test_apt_v3_list_rename_non_slash(self, m_get_architecture):
+ target = os.path.join(self.tmp, "rename_non_slash")
+ apt_lists_d = os.path.join(target, "./" + cc_apt_configure.APT_LISTS)
+
+ m_get_architecture.return_value = 'amd64'
+
+ mirror_path = "some/random/path/"
+ primary = "http://test.ubuntu.com/" + mirror_path
+ security = "http://test-security.ubuntu.com/" + mirror_path
+ mirrors = {'PRIMARY': primary, 'SECURITY': security}
+
+ # these match default archive prefixes
+ opri_pre = "archive.ubuntu.com_ubuntu_dists_xenial"
+ osec_pre = "security.ubuntu.com_ubuntu_dists_xenial"
+ # this one won't match and should not be renamed defaults.
+ other_pre = "dl.google.com_linux_chrome_deb_dists_stable"
+ # these are our new expected prefixes
+ npri_pre = "test.ubuntu.com_some_random_path_dists_xenial"
+ nsec_pre = "test-security.ubuntu.com_some_random_path_dists_xenial"
+
+ files = [
+ # orig prefix, new prefix, suffix
+ (opri_pre, npri_pre, "_main_binary-amd64_Packages"),
+ (opri_pre, npri_pre, "_main_binary-amd64_InRelease"),
+ (opri_pre, npri_pre, "-updates_main_binary-amd64_Packages"),
+ (opri_pre, npri_pre, "-updates_main_binary-amd64_InRelease"),
+ (other_pre, other_pre, "_main_binary-amd64_Packages"),
+ (other_pre, other_pre, "_Release"),
+ (other_pre, other_pre, "_Release.gpg"),
+ (osec_pre, nsec_pre, "_InRelease"),
+ (osec_pre, nsec_pre, "_main_binary-amd64_Packages"),
+ (osec_pre, nsec_pre, "_universe_binary-amd64_Packages"),
+ ]
+
+ expected = sorted([npre + suff for opre, npre, suff in files])
+ # create files
+ for (opre, npre, suff) in files:
+ fpath = os.path.join(apt_lists_d, opre + suff)
+ util.write_file(fpath, content=fpath)
+
+ cc_apt_configure.rename_apt_lists(mirrors, target)
+ found = sorted(os.listdir(apt_lists_d))
+ self.assertEqual(expected, found)
+
+ @staticmethod
+ def test_apt_v3_proxy():
+ """test_apt_v3_proxy - Test apt_*proxy configuration"""
+ cfg = {"proxy": "foobar1",
+ "http_proxy": "foobar2",
+ "ftp_proxy": "foobar3",
+ "https_proxy": "foobar4"}
+
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_apt_configure.apply_apt_config(cfg, "proxyfn", "notused")
+
+ mockobj.assert_called_with('proxyfn',
+ ('Acquire::http::Proxy "foobar1";\n'
+ 'Acquire::http::Proxy "foobar2";\n'
+ 'Acquire::ftp::Proxy "foobar3";\n'
+ 'Acquire::https::Proxy "foobar4";\n'))
+
+ def test_apt_v3_mirror(self):
+ """test_apt_v3_mirror - Test defining a mirror"""
+ pmir = "http://us.archive.ubuntu.com/ubuntu/"
+ smir = "http://security.ubuntu.com/ubuntu/"
+ cfg = {"primary": [{'arches': ["default"],
+ "uri": pmir}],
+ "security": [{'arches': ["default"],
+ "uri": smir}]}
+
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64')
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+ def test_apt_v3_mirror_default(self):
+ """test_apt_v3_mirror_default - Test without defining a mirror"""
+ arch = 'amd64'
+ default_mirrors = cc_apt_configure.get_default_mirrors(arch)
+ pmir = default_mirrors["PRIMARY"]
+ smir = default_mirrors["SECURITY"]
+ mycloud = self._get_cloud('ubuntu')
+ mirrors = cc_apt_configure.find_apt_mirror_info({}, mycloud, arch)
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+ def test_apt_v3_mirror_arches(self):
+ """test_apt_v3_mirror_arches - Test arches selection of mirror"""
+ pmir = "http://my-primary.ubuntu.com/ubuntu/"
+ smir = "http://my-security.ubuntu.com/ubuntu/"
+ arch = 'ppc64el'
+ cfg = {"primary": [{'arches': ["default"], "uri": "notthis-primary"},
+ {'arches': [arch], "uri": pmir}],
+ "security": [{'arches': ["default"], "uri": "nothis-security"},
+ {'arches': [arch], "uri": smir}]}
+
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch)
+
+ self.assertEqual(mirrors['PRIMARY'], pmir)
+ self.assertEqual(mirrors['MIRROR'], pmir)
+ self.assertEqual(mirrors['SECURITY'], smir)
+
+ def test_apt_v3_mirror_arches_default(self):
+ """test_apt_v3_mirror_arches - Test falling back to default arch"""
+ pmir = "http://us.archive.ubuntu.com/ubuntu/"
+ smir = "http://security.ubuntu.com/ubuntu/"
+ cfg = {"primary": [{'arches': ["default"],
+ "uri": pmir},
+ {'arches': ["thisarchdoesntexist"],
+ "uri": "notthis"}],
+ "security": [{'arches': ["thisarchdoesntexist"],
+ "uri": "nothat"},
+ {'arches': ["default"],
+ "uri": smir}]}
+
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64')
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture")
+ def test_apt_v3_get_def_mir_non_intel_no_arch(self, m_get_architecture):
+ arch = 'ppc64el'
+ m_get_architecture.return_value = arch
+ expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports',
+ 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'}
+ self.assertEqual(expected, cc_apt_configure.get_default_mirrors())
+
+ def test_apt_v3_get_default_mirrors_non_intel_with_arch(self):
+ found = cc_apt_configure.get_default_mirrors('ppc64el')
+
+ expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports',
+ 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'}
+ self.assertEqual(expected, found)
+
+ def test_apt_v3_mirror_arches_sysdefault(self):
+ """test_apt_v3_mirror_arches - Test arches fallback to sys default"""
+ arch = 'amd64'
+ default_mirrors = cc_apt_configure.get_default_mirrors(arch)
+ pmir = default_mirrors["PRIMARY"]
+ smir = default_mirrors["SECURITY"]
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {"primary": [{'arches': ["thisarchdoesntexist_64"],
+ "uri": "notthis"},
+ {'arches': ["thisarchdoesntexist"],
+ "uri": "notthiseither"}],
+ "security": [{'arches': ["thisarchdoesntexist"],
+ "uri": "nothat"},
+ {'arches': ["thisarchdoesntexist_64"],
+ "uri": "nothateither"}]}
+
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch)
+
+ self.assertEqual(mirrors['MIRROR'], pmir)
+ self.assertEqual(mirrors['PRIMARY'], pmir)
+ self.assertEqual(mirrors['SECURITY'], smir)
+
+ def test_apt_v3_mirror_search(self):
+ """test_apt_v3_mirror_search - Test searching mirrors in a list
+ mock checks to avoid relying on network connectivity"""
+ pmir = "http://us.archive.ubuntu.com/ubuntu/"
+ smir = "http://security.ubuntu.com/ubuntu/"
+ cfg = {"primary": [{'arches': ["default"],
+ "search": ["pfailme", pmir]}],
+ "security": [{'arches': ["default"],
+ "search": ["sfailme", smir]}]}
+
+ with mock.patch.object(cc_apt_configure, 'search_for_mirror',
+ side_effect=[pmir, smir]) as mocksearch:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None,
+ 'amd64')
+
+ calls = [call(["pfailme", pmir]),
+ call(["sfailme", smir])]
+ mocksearch.assert_has_calls(calls)
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+ def test_apt_v3_mirror_search_many2(self):
+ """test_apt_v3_mirror_search_many3 - Test both mirrors specs at once"""
+ pmir = "http://us.archive.ubuntu.com/ubuntu/"
+ smir = "http://security.ubuntu.com/ubuntu/"
+ cfg = {"primary": [{'arches': ["default"],
+ "uri": pmir,
+ "search": ["pfailme", "foo"]}],
+ "security": [{'arches': ["default"],
+ "uri": smir,
+ "search": ["sfailme", "bar"]}]}
+
+ arch = 'amd64'
+
+ # should be called only once per type, despite two mirror configs
+ mycloud = None
+ with mock.patch.object(cc_apt_configure, 'get_mirror',
+ return_value="http://mocked/foo") as mockgm:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch)
+ calls = [call(cfg, 'primary', arch, mycloud),
+ call(cfg, 'security', arch, mycloud)]
+ mockgm.assert_has_calls(calls)
+
+ # should not be called, since primary is specified
+ with mock.patch.object(cc_apt_configure,
+ 'search_for_mirror') as mockse:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch)
+ mockse.assert_not_called()
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+ def test_apt_v3_url_resolvable(self):
+ """test_apt_v3_url_resolvable - Test resolving urls"""
+
+ with mock.patch.object(util, 'is_resolvable') as mockresolve:
+ util.is_resolvable_url("http://1.2.3.4/ubuntu")
+ mockresolve.assert_called_with("1.2.3.4")
+
+ with mock.patch.object(util, 'is_resolvable') as mockresolve:
+ util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu")
+ mockresolve.assert_called_with("us.archive.ubuntu.com")
+
+ # former tests can leave this set (or not if the test is ran directly)
+ # do a hard reset to ensure a stable result
+ util._DNS_REDIRECT_IP = None
+ bad = [(None, None, None, "badname", ["10.3.2.1"])]
+ good = [(None, None, None, "goodname", ["10.2.3.4"])]
+ with mock.patch.object(socket, 'getaddrinfo',
+ side_effect=[bad, bad, bad, good,
+ good]) as mocksock:
+ ret = util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu")
+ ret2 = util.is_resolvable_url("http://1.2.3.4/ubuntu")
+ mocksock.assert_any_call('does-not-exist.example.com.', None,
+ 0, 0, 1, 2)
+ mocksock.assert_any_call('example.invalid.', None, 0, 0, 1, 2)
+ mocksock.assert_any_call('us.archive.ubuntu.com', None)
+ mocksock.assert_any_call('1.2.3.4', None)
+
+ self.assertTrue(ret)
+ self.assertTrue(ret2)
+
+ # side effect need only bad ret after initial call
+ with mock.patch.object(socket, 'getaddrinfo',
+ side_effect=[bad]) as mocksock:
+ ret3 = util.is_resolvable_url("http://failme.com/ubuntu")
+ calls = [call('failme.com', None)]
+ mocksock.assert_has_calls(calls)
+ self.assertFalse(ret3)
+
+ def test_apt_v3_disable_suites(self):
+ """test_disable_suites - disable_suites with many configurations"""
+ release = "xenial"
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+
+ # disable nothing
+ disabled = []
+ expect = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable release suite
+ disabled = ["$RELEASE"]
+ expect = """\
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable other suite
+ disabled = ["$RELEASE-updates"]
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu"""
+ """ xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # multi disable
+ disabled = ["$RELEASE-updates", "$RELEASE-security"]
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-updates main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # multi line disable (same suite multiple times in input)
+ disabled = ["$RELEASE-updates", "$RELEASE-security"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://UBUNTU.com//ubuntu xenial-updates main
+deb http://UBUNTU.COM//ubuntu xenial-updates main
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-updates main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+# suite disabled by cloud-init: deb http://UBUNTU.com//ubuntu """
+ """xenial-updates main
+# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """
+ """xenial-updates main
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # comment in input
+ disabled = ["$RELEASE-updates", "$RELEASE-security"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+#foo
+#deb http://UBUNTU.com//ubuntu xenial-updates main
+deb http://UBUNTU.COM//ubuntu xenial-updates main
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-updates main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+#foo
+#deb http://UBUNTU.com//ubuntu xenial-updates main
+# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """
+ """xenial-updates main
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable custom suite
+ disabled = ["foobar"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb http://ubuntu.com/ubuntu/ foobar main"""
+ expect = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+# suite disabled by cloud-init: deb http://ubuntu.com/ubuntu/ foobar main"""
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable non existing suite
+ disabled = ["foobar"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb http://ubuntu.com/ubuntu/ notfoobar main"""
+ expect = """deb http://ubuntu.com//ubuntu xenial main
+deb http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb http://ubuntu.com/ubuntu/ notfoobar main"""
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable suite with option
+ disabled = ["$RELEASE-updates"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb [a=b] http://ubu.com//ubu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb [a=b] http://ubu.com//ubu """
+ """xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable suite with more options and auto $RELEASE expansion
+ disabled = ["updates"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb [a=b c=d] http://ubu.com//ubu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ expect = """deb http://ubuntu.com//ubuntu xenial main
+# suite disabled by cloud-init: deb [a=b c=d] \
+http://ubu.com//ubu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ # single disable suite while options at others
+ disabled = ["$RELEASE-security"]
+ orig = """deb http://ubuntu.com//ubuntu xenial main
+deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main
+deb http://ubuntu.com//ubuntu xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main"""
+ expect = ("""deb http://ubuntu.com//ubuntu xenial main
+deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main
+# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """
+ """xenial-security main
+deb-src http://ubuntu.com//ubuntu universe multiverse
+deb http://ubuntu.com/ubuntu/ xenial-proposed main""")
+ result = cc_apt_configure.disable_suites(disabled, orig, release)
+ self.assertEqual(expect, result)
+
+ def test_disable_suites_blank_lines(self):
+ """test_disable_suites_blank_lines - ensure blank lines allowed"""
+ lines = ["deb %(repo)s %(rel)s main universe",
+ "",
+ "deb %(repo)s %(rel)s-updates main universe",
+ " # random comment",
+ "#comment here",
+ ""]
+ rel = "trusty"
+ repo = 'http://example.com/mirrors/ubuntu'
+ orig = "\n".join(lines) % {'repo': repo, 'rel': rel}
+ self.assertEqual(
+ orig, cc_apt_configure.disable_suites(["proposed"], orig, rel))
+
+ def test_apt_v3_mirror_search_dns(self):
+ """test_apt_v3_mirror_search_dns - Test searching dns patterns"""
+ pmir = "phit"
+ smir = "shit"
+ arch = 'amd64'
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {"primary": [{'arches': ["default"],
+ "search_dns": True}],
+ "security": [{'arches': ["default"],
+ "search_dns": True}]}
+
+ with mock.patch.object(cc_apt_configure, 'get_mirror',
+ return_value="http://mocked/foo") as mockgm:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch)
+ calls = [call(cfg, 'primary', arch, mycloud),
+ call(cfg, 'security', arch, mycloud)]
+ mockgm.assert_has_calls(calls)
+
+ with mock.patch.object(cc_apt_configure, 'search_for_mirror_dns',
+ return_value="http://mocked/foo") as mocksdns:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch)
+ calls = [call(True, 'primary', cfg, mycloud),
+ call(True, 'security', cfg, mycloud)]
+ mocksdns.assert_has_calls(calls)
+
+ # first return is for the non-dns call before
+ with mock.patch.object(cc_apt_configure, 'search_for_mirror',
+ side_effect=[None, pmir, None, smir]) as mockse:
+ mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch)
+
+ calls = [call(None),
+ call(['http://ubuntu-mirror.localdomain/ubuntu',
+ 'http://ubuntu-mirror/ubuntu']),
+ call(None),
+ call(['http://ubuntu-security-mirror.localdomain/ubuntu',
+ 'http://ubuntu-security-mirror/ubuntu'])]
+ mockse.assert_has_calls(calls)
+
+ self.assertEqual(mirrors['MIRROR'],
+ pmir)
+ self.assertEqual(mirrors['PRIMARY'],
+ pmir)
+ self.assertEqual(mirrors['SECURITY'],
+ smir)
+
+
+class TestDebconfSelections(TestCase):
+
+ @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections")
+ def test_no_set_sel_if_none_to_set(self, m_set_sel):
+ cc_apt_configure.apply_debconf_selections({'foo': 'bar'})
+ m_set_sel.assert_not_called()
+
+ @mock.patch("cloudinit.config.cc_apt_configure."
+ "debconf_set_selections")
+ @mock.patch("cloudinit.config.cc_apt_configure."
+ "util.get_installed_packages")
+ def test_set_sel_call_has_expected_input(self, m_get_inst, m_set_sel):
+ data = {
+ 'set1': 'pkga pkga/q1 mybool false',
+ 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n'
+ 'pkgc\tpkgc/ip\tstring\t10.0.0.1')}
+ lines = '\n'.join(data.values()).split('\n')
+
+ m_get_inst.return_value = ["adduser", "apparmor"]
+ m_set_sel.return_value = None
+
+ cc_apt_configure.apply_debconf_selections({'debconf_selections': data})
+ self.assertTrue(m_get_inst.called)
+ self.assertEqual(m_set_sel.call_count, 1)
+
+ # assumes called with *args value.
+ selections = m_set_sel.call_args_list[0][0][0].decode()
+
+ missing = [l for l in lines if l not in selections.splitlines()]
+ self.assertEqual([], missing)
+
+ @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure")
+ @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections")
+ @mock.patch("cloudinit.config.cc_apt_configure."
+ "util.get_installed_packages")
+ def test_reconfigure_if_intersection(self, m_get_inst, m_set_sel,
+ m_dpkg_r):
+ data = {
+ 'set1': 'pkga pkga/q1 mybool false',
+ 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n'
+ 'pkgc\tpkgc/ip\tstring\t10.0.0.1'),
+ 'cloud-init': ('cloud-init cloud-init/datasources'
+ 'multiselect MAAS')}
+
+ m_set_sel.return_value = None
+ m_get_inst.return_value = ["adduser", "apparmor", "pkgb",
+ "cloud-init", 'zdog']
+
+ cc_apt_configure.apply_debconf_selections({'debconf_selections': data})
+
+ # reconfigure should be called with the intersection
+ # of (packages in config, packages installed)
+ self.assertEqual(m_dpkg_r.call_count, 1)
+ # assumes called with *args (dpkg_reconfigure([a,b,c], target=))
+ packages = m_dpkg_r.call_args_list[0][0][0]
+ self.assertEqual(set(['cloud-init', 'pkgb']), set(packages))
+
+ @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure")
+ @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections")
+ @mock.patch("cloudinit.config.cc_apt_configure."
+ "util.get_installed_packages")
+ def test_reconfigure_if_no_intersection(self, m_get_inst, m_set_sel,
+ m_dpkg_r):
+ data = {'set1': 'pkga pkga/q1 mybool false'}
+
+ m_get_inst.return_value = ["adduser", "apparmor", "pkgb",
+ "cloud-init", 'zdog']
+ m_set_sel.return_value = None
+
+ cc_apt_configure.apply_debconf_selections({'debconf_selections': data})
+
+ self.assertTrue(m_get_inst.called)
+ self.assertEqual(m_dpkg_r.call_count, 0)
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.subp")
+ def test_dpkg_reconfigure_does_reconfigure(self, m_subp):
+ target = "/foo-target"
+
+ # due to the way the cleaners are called (via dictionary reference)
+ # mocking clean_cloud_init directly does not work. So we mock
+ # the CONFIG_CLEANERS dictionary and assert our cleaner is called.
+ ci_cleaner = mock.MagicMock()
+ with mock.patch.dict(("cloudinit.config.cc_apt_configure."
+ "CONFIG_CLEANERS"),
+ values={'cloud-init': ci_cleaner}, clear=True):
+ cc_apt_configure.dpkg_reconfigure(['pkga', 'cloud-init'],
+ target=target)
+ # cloud-init is actually the only package we have a cleaner for
+ # so for now, its the only one that should reconfigured
+ self.assertTrue(m_subp.called)
+ ci_cleaner.assert_called_with(target)
+ self.assertEqual(m_subp.call_count, 1)
+ found = m_subp.call_args_list[0][0][0]
+ expected = ['dpkg-reconfigure', '--frontend=noninteractive',
+ 'cloud-init']
+ self.assertEqual(expected, found)
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.subp")
+ def test_dpkg_reconfigure_not_done_on_no_data(self, m_subp):
+ cc_apt_configure.dpkg_reconfigure([])
+ m_subp.assert_not_called()
+
+ @mock.patch("cloudinit.config.cc_apt_configure.util.subp")
+ def test_dpkg_reconfigure_not_done_if_no_cleaners(self, m_subp):
+ cc_apt_configure.dpkg_reconfigure(['pkgfoo', 'pkgbar'])
+ m_subp.assert_not_called()
+
+#
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_mcollective.py b/tests/unittests/test_handler/test_handler_mcollective.py
index f9448d80..c3a5a634 100644
--- a/tests/unittests/test_handler/test_handler_mcollective.py
+++ b/tests/unittests/test_handler/test_handler_mcollective.py
@@ -1,10 +1,12 @@
+from cloudinit import (cloud, distros, helpers, util)
from cloudinit.config import cc_mcollective
-from cloudinit import util
+from cloudinit.sources import DataSourceNoCloud
-from .. import helpers
+from .. import helpers as t_help
import configobj
import logging
+import os
import shutil
from six import BytesIO
import tempfile
@@ -12,11 +14,43 @@ import tempfile
LOG = logging.getLogger(__name__)
-class TestConfig(helpers.FilesystemMockingTestCase):
+STOCK_CONFIG = """\
+main_collective = mcollective
+collectives = mcollective
+libdir = /usr/share/mcollective/plugins
+logfile = /var/log/mcollective.log
+loglevel = info
+daemonize = 1
+
+# Plugins
+securityprovider = psk
+plugin.psk = unset
+
+connector = activemq
+plugin.activemq.pool.size = 1
+plugin.activemq.pool.1.host = stomp1
+plugin.activemq.pool.1.port = 61613
+plugin.activemq.pool.1.user = mcollective
+plugin.activemq.pool.1.password = marionette
+
+# Facts
+factsource = yaml
+plugin.yaml = /etc/mcollective/facts.yaml
+"""
+
+
+class TestConfig(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
+ # "./": make os.path.join behave correctly with abs path as second arg
+ self.server_cfg = os.path.join(
+ self.tmp, "./" + cc_mcollective.SERVER_CFG)
+ self.pubcert_file = os.path.join(
+ self.tmp, "./" + cc_mcollective.PUBCERT_FILE)
+ self.pricert_file = os.path.join(
+ self.tmp, self.tmp, "./" + cc_mcollective.PRICERT_FILE)
def test_basic_config(self):
cfg = {
@@ -38,23 +72,79 @@ class TestConfig(helpers.FilesystemMockingTestCase):
},
},
}
+ expected = cfg['mcollective']['conf']
+
self.patchUtils(self.tmp)
cc_mcollective.configure(cfg['mcollective']['conf'])
- contents = util.load_file("/etc/mcollective/server.cfg", decode=False)
+ contents = util.load_file(cc_mcollective.SERVER_CFG, decode=False)
contents = configobj.ConfigObj(BytesIO(contents))
- expected = {
- 'loglevel': 'debug',
- 'connector': 'rabbitmq',
- 'logfile': '/var/log/mcollective.log',
- 'ttl': '4294957',
- 'collectives': 'mcollective',
- 'main_collective': 'mcollective',
- 'securityprovider': 'psk',
- 'daemonize': '1',
- 'factsource': 'yaml',
- 'direct_addressing': '1',
- 'plugin.psk': 'unset',
- 'libdir': '/usr/share/mcollective/plugins',
- 'identity': '1',
- }
self.assertEqual(expected, dict(contents))
+
+ def test_existing_config_is_saved(self):
+ cfg = {'loglevel': 'warn'}
+ util.write_file(self.server_cfg, STOCK_CONFIG)
+ cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg)
+ self.assertTrue(os.path.exists(self.server_cfg))
+ self.assertTrue(os.path.exists(self.server_cfg + ".old"))
+ self.assertEqual(util.load_file(self.server_cfg + ".old"),
+ STOCK_CONFIG)
+
+ def test_existing_updated(self):
+ cfg = {'loglevel': 'warn'}
+ util.write_file(self.server_cfg, STOCK_CONFIG)
+ cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg)
+ cfgobj = configobj.ConfigObj(self.server_cfg)
+ self.assertEqual(cfg['loglevel'], cfgobj['loglevel'])
+
+ def test_certificats_written(self):
+ # check public-cert and private-cert keys in config get written
+ cfg = {'loglevel': 'debug',
+ 'public-cert': "this is my public-certificate",
+ 'private-cert': "secret private certificate"}
+
+ cc_mcollective.configure(
+ config=cfg, server_cfg=self.server_cfg,
+ pricert_file=self.pricert_file, pubcert_file=self.pubcert_file)
+
+ found = configobj.ConfigObj(self.server_cfg)
+
+ # make sure these didnt get written in
+ self.assertFalse('public-cert' in found)
+ self.assertFalse('private-cert' in found)
+
+ # these need updating to the specified paths
+ self.assertEqual(found['plugin.ssl_server_public'], self.pubcert_file)
+ self.assertEqual(found['plugin.ssl_server_private'], self.pricert_file)
+
+ # and the security provider should be ssl
+ self.assertEqual(found['securityprovider'], 'ssl')
+
+ self.assertEqual(
+ util.load_file(self.pricert_file), cfg['private-cert'])
+ self.assertEqual(
+ util.load_file(self.pubcert_file), cfg['public-cert'])
+
+
+class TestHandler(t_help.TestCase):
+ def _get_cloud(self, distro):
+ cls = distros.fetch(distro)
+ paths = helpers.Paths({})
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ @t_help.mock.patch("cloudinit.config.cc_mcollective.util")
+ def test_mcollective_install(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = t_help.mock.MagicMock()
+ mock_util.load_file.return_value = b""
+ mycfg = {'mcollective': {'conf': {'loglevel': 'debug'}}}
+ cc_mcollective.handle('cc_mcollective', mycfg, cc, LOG, [])
+ self.assertTrue(cc.distro.install_packages.called)
+ install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
+ self.assertEqual(install_pkg, ('mcollective',))
+
+ self.assertTrue(mock_util.subp.called)
+ self.assertEqual(mock_util.subp.call_args_list[0][0][0],
+ ['service', 'mcollective', 'restart'])
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
new file mode 100644
index 00000000..1c7bb06a
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -0,0 +1,274 @@
+from cloudinit.config import cc_ntp
+from cloudinit.sources import DataSourceNone
+from cloudinit import templater
+from cloudinit import (distros, helpers, cloud, util)
+from ..helpers import FilesystemMockingTestCase, mock
+
+import logging
+import os
+import shutil
+import tempfile
+
+LOG = logging.getLogger(__name__)
+
+NTP_TEMPLATE = """
+## template: jinja
+
+{% if pools %}# pools
+{% endif %}
+{% for pool in pools -%}
+pool {{pool}} iburst
+{% endfor %}
+{%- if servers %}# servers
+{% endif %}
+{% for server in servers -%}
+server {{server}} iburst
+{% endfor %}
+
+"""
+
+
+NTP_EXPECTED_UBUNTU = """
+# pools
+pool 0.mycompany.pool.ntp.org iburst
+# servers
+server 192.168.23.3 iburst
+
+"""
+
+
+class TestNtp(FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestNtp, self).setUp()
+ self.subp = util.subp
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_ntp_install(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = None
+ install_func = mock.MagicMock()
+
+ cc_ntp.install_ntp(install_func, packages=['ntpx'], check_exe='ntpdx')
+
+ self.assertTrue(install_func.called)
+ mock_util.which.assert_called_with('ntpdx')
+ install_pkg = install_func.call_args_list[0][0][0]
+ self.assertEqual(sorted(install_pkg), ['ntpx'])
+
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_ntp_install_not_needed(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc.distro.name = 'ubuntu'
+ mock_util.which.return_value = ["/usr/sbin/ntpd"]
+ cc_ntp.install_ntp(cc)
+ self.assertFalse(cc.distro.install_packages.called)
+
+ def test_ntp_rename_ntp_conf(self):
+ with mock.patch.object(os.path, 'exists',
+ return_value=True) as mockpath:
+ with mock.patch.object(util, 'rename') as mockrename:
+ cc_ntp.rename_ntp_conf()
+
+ mockpath.assert_called_with('/etc/ntp.conf')
+ mockrename.assert_called_with('/etc/ntp.conf', '/etc/ntp.conf.dist')
+
+ def test_ntp_rename_ntp_conf_skip_missing(self):
+ with mock.patch.object(os.path, 'exists',
+ return_value=False) as mockpath:
+ with mock.patch.object(util, 'rename') as mockrename:
+ cc_ntp.rename_ntp_conf()
+
+ mockpath.assert_called_with('/etc/ntp.conf')
+ mockrename.assert_not_called()
+
+ def ntp_conf_render(self, distro):
+ """ntp_conf_render
+ Test rendering of a ntp.conf from template for a given distro
+ """
+
+ cfg = {'ntp': {}}
+ mycloud = self._get_cloud(distro)
+ distro_names = cc_ntp.generate_server_names(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile', return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.write_ntp_config_template(cfg, mycloud)
+
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
+ '/etc/ntp.conf',
+ {'servers': [], 'pools': distro_names})
+
+ def test_ntp_conf_render_rhel(self):
+ """Test templater.render_to_file() for rhel"""
+ self.ntp_conf_render('rhel')
+
+ def test_ntp_conf_render_debian(self):
+ """Test templater.render_to_file() for debian"""
+ self.ntp_conf_render('debian')
+
+ def test_ntp_conf_render_fedora(self):
+ """Test templater.render_to_file() for fedora"""
+ self.ntp_conf_render('fedora')
+
+ def test_ntp_conf_render_sles(self):
+ """Test templater.render_to_file() for sles"""
+ self.ntp_conf_render('sles')
+
+ def test_ntp_conf_render_ubuntu(self):
+ """Test templater.render_to_file() for ubuntu"""
+ self.ntp_conf_render('ubuntu')
+
+ def test_ntp_conf_servers_no_pools(self):
+ distro = 'ubuntu'
+ pools = []
+ servers = ['192.168.2.1']
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers,
+ }
+ }
+ mycloud = self._get_cloud(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile', return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
+
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
+ '/etc/ntp.conf',
+ {'servers': servers, 'pools': pools})
+
+ def test_ntp_conf_custom_pools_no_server(self):
+ distro = 'ubuntu'
+ pools = ['0.mycompany.pool.ntp.org']
+ servers = []
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers,
+ }
+ }
+ mycloud = self._get_cloud(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile', return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
+
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
+ '/etc/ntp.conf',
+ {'servers': servers, 'pools': pools})
+
+ def test_ntp_conf_custom_pools_and_server(self):
+ distro = 'ubuntu'
+ pools = ['0.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3']
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers,
+ }
+ }
+ mycloud = self._get_cloud(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile', return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.write_ntp_config_template(cfg.get('ntp'), mycloud)
+
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/ntp.conf.%s.tmpl' % distro),
+ '/etc/ntp.conf',
+ {'servers': servers, 'pools': pools})
+
+ def test_ntp_conf_contents_match(self):
+ """Test rendered contents of /etc/ntp.conf for ubuntu"""
+ pools = ['0.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3']
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers,
+ }
+ }
+ mycloud = self._get_cloud('ubuntu')
+ side_effect = [NTP_TEMPLATE.lstrip()]
+
+ # work backwards from util.write_file and mock out call path
+ # write_ntp_config_template()
+ # cloud.get_template_filename()
+ # os.path.isfile()
+ # templater.render_to_file()
+ # templater.render_from_file()
+ # util.load_file()
+ # util.write_file()
+ #
+ with mock.patch.object(util, 'write_file') as mockwrite:
+ with mock.patch.object(util, 'load_file', side_effect=side_effect):
+ with mock.patch.object(os.path, 'isfile', return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.write_ntp_config_template(cfg.get('ntp'),
+ mycloud)
+
+ mockwrite.assert_called_once_with(
+ '/etc/ntp.conf',
+ NTP_EXPECTED_UBUNTU,
+ mode=420)
+
+ def test_ntp_handler(self):
+ """Test ntp handler renders ubuntu ntp.conf template"""
+ pools = ['0.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3']
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers,
+ }
+ }
+ mycloud = self._get_cloud('ubuntu')
+ side_effect = [NTP_TEMPLATE.lstrip()]
+
+ with mock.patch.object(util, 'which', return_value=None):
+ with mock.patch.object(os.path, 'exists'):
+ with mock.patch.object(util, 'write_file') as mockwrite:
+ with mock.patch.object(util, 'load_file',
+ side_effect=side_effect):
+ with mock.patch.object(os.path, 'isfile',
+ return_value=True):
+ with mock.patch.object(util, 'rename'):
+ cc_ntp.handle("notimportant", cfg,
+ mycloud, LOG, None)
+
+ mockwrite.assert_called_once_with(
+ '/etc/ntp.conf',
+ NTP_EXPECTED_UBUNTU,
+ mode=420)
+
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_no_ntpcfg_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_ntp.handle('cc_ntp', {}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
diff --git a/tests/unittests/test_handler/test_handler_spacewalk.py b/tests/unittests/test_handler/test_handler_spacewalk.py
new file mode 100644
index 00000000..44f95e4c
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_spacewalk.py
@@ -0,0 +1,42 @@
+from cloudinit.config import cc_spacewalk
+from cloudinit import util
+
+from .. import helpers
+
+import logging
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+LOG = logging.getLogger(__name__)
+
+
+class TestSpacewalk(helpers.TestCase):
+ space_cfg = {
+ 'spacewalk': {
+ 'server': 'localhost',
+ 'profile_name': 'test',
+ }
+ }
+
+ @mock.patch("cloudinit.config.cc_spacewalk.util.subp")
+ def test_not_is_registered(self, mock_util_subp):
+ mock_util_subp.side_effect = util.ProcessExecutionError(exit_code=1)
+ self.assertFalse(cc_spacewalk.is_registered())
+
+ @mock.patch("cloudinit.config.cc_spacewalk.util.subp")
+ def test_is_registered(self, mock_util_subp):
+ mock_util_subp.side_effect = None
+ self.assertTrue(cc_spacewalk.is_registered())
+
+ @mock.patch("cloudinit.config.cc_spacewalk.util.subp")
+ def test_do_register(self, mock_util_subp):
+ cc_spacewalk.do_register(**self.space_cfg['spacewalk'])
+ mock_util_subp.assert_called_with([
+ 'rhnreg_ks',
+ '--serverUrl', 'https://localhost/XMLRPC',
+ '--profilename', 'test',
+ '--sslCACert', cc_spacewalk.def_ca_cert_path,
+ ], capture=False)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 37a984ac..d2031f59 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -371,8 +371,30 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
self._create_sysfs_parent_directory()
expected_dmi_value = 'dmidecode-used'
self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
- self.assertEqual(expected_dmi_value,
- util.read_dmi_data('use-dmidecode'))
+ with mock.patch("cloudinit.util.os.uname") as m_uname:
+ m_uname.return_value = ('x-sysname', 'x-nodename',
+ 'x-release', 'x-version', 'x86_64')
+ self.assertEqual(expected_dmi_value,
+ util.read_dmi_data('use-dmidecode'))
+
+ def test_dmidecode_not_used_on_arm(self):
+ self.patch_mapping({})
+ self._create_sysfs_parent_directory()
+ dmi_val = 'from-dmidecode'
+ dmi_name = 'use-dmidecode'
+ self._configure_dmidecode_return(dmi_name, dmi_val)
+
+ expected = {'armel': None, 'aarch64': None, 'x86_64': dmi_val}
+ found = {}
+ # we do not run the 'dmi-decode' binary on some arches
+ # verify that anything requested that is not in the sysfs dir
+ # will return None on those arches.
+ with mock.patch("cloudinit.util.os.uname") as m_uname:
+ for arch in expected:
+ m_uname.return_value = ('x-sysname', 'x-nodename',
+ 'x-release', 'x-version', arch)
+ found[arch] = util.read_dmi_data(dmi_name)
+ self.assertEqual(expected, found)
def test_none_returned_if_neither_source_has_data(self):
self.patch_mapping({})
@@ -486,4 +508,73 @@ class TestReadSeeded(helpers.TestCase):
self.assertEqual(found_md, {'key1': 'val1'})
self.assertEqual(found_ud, ud)
+
+class TestSubp(helpers.TestCase):
+
+ stdin2err = ['bash', '-c', 'cat >&2']
+ stdin2out = ['cat']
+ utf8_invalid = b'ab\xaadef'
+ utf8_valid = b'start \xc3\xa9 end'
+ utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
+
+ def printf_cmd(self, *args):
+ # bash's printf supports \xaa. So does /usr/bin/printf
+ # but by using bash, we remove dependency on another program.
+ return(['bash', '-c', 'printf "$@"', 'printf'] + list(args))
+
+ def test_subp_handles_utf8(self):
+ # The given bytes contain utf-8 accented characters as seen in e.g.
+ # the "deja dup" package in Ubuntu.
+ cmd = self.printf_cmd(self.utf8_valid_2)
+ (out, _err) = util.subp(cmd, capture=True)
+ self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
+
+ def test_subp_respects_decode_false(self):
+ (out, err) = util.subp(self.stdin2out, capture=True, decode=False,
+ data=self.utf8_valid)
+ self.assertTrue(isinstance(out, bytes))
+ self.assertTrue(isinstance(err, bytes))
+ self.assertEqual(out, self.utf8_valid)
+
+ def test_subp_decode_ignore(self):
+ # this executes a string that writes invalid utf-8 to stdout
+ (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'),
+ capture=True, decode='ignore')
+ self.assertEqual(out, 'abcdef')
+
+ def test_subp_decode_strict_valid_utf8(self):
+ (out, _err) = util.subp(self.stdin2out, capture=True,
+ decode='strict', data=self.utf8_valid)
+ self.assertEqual(out, self.utf8_valid.decode('utf-8'))
+
+ def test_subp_decode_invalid_utf8_replaces(self):
+ (out, _err) = util.subp(self.stdin2out, capture=True,
+ data=self.utf8_invalid)
+ expected = self.utf8_invalid.decode('utf-8', errors='replace')
+ self.assertEqual(out, expected)
+
+ def test_subp_decode_strict_raises(self):
+ args = []
+ kwargs = {'args': self.stdin2out, 'capture': True,
+ 'decode': 'strict', 'data': self.utf8_invalid}
+ self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs)
+
+ def test_subp_capture_stderr(self):
+ data = b'hello world'
+ (out, err) = util.subp(self.stdin2err, capture=True,
+ decode=False, data=data)
+ self.assertEqual(err, data)
+ self.assertEqual(out, b'')
+
+ def test_returns_none_if_no_capture(self):
+ (out, err) = util.subp(self.stdin2out, data=b'', capture=False)
+ self.assertEqual(err, None)
+ self.assertEqual(out, None)
+
+ def test_bunch_of_slashes_in_path(self):
+ self.assertEqual("/target/my/path/",
+ util.target_path("/target/", "//my/path/"))
+ self.assertEqual("/target/my/path/",
+ util.target_path("/target/", "///my/path/"))
+
# vi: ts=4 expandtab