summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2016-06-06 18:42:29 -0700
committerJoshua Harlow <harlowja@gmail.com>2016-06-06 18:42:29 -0700
commitf640797e342b6efbfb838a6350b312935222e992 (patch)
tree4e893298101cf3141d80b4bf2a2d6e009462502d /tests/unittests
parent85a53d66ad0241b2d6453d902487bb2edc1512b8 (diff)
parentbc9bd58d1533d996029770da758f73217c15af33 (diff)
downloadvyos-cloud-init-f640797e342b6efbfb838a6350b312935222e992.tar.gz
vyos-cloud-init-f640797e342b6efbfb838a6350b312935222e992.zip
Rebase against master
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py125
-rw-r--r--tests/unittests/test_datasource/test_openstack.py32
-rw-r--r--tests/unittests/test_datasource/test_smartos.py290
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list.py165
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source.py551
-rw-r--r--tests/unittests/test_merging.py4
6 files changed, 973 insertions, 194 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 5395e544..b898e2bd 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -6,6 +6,8 @@ import six
import tempfile
from cloudinit import helpers
+from cloudinit.net import eni
+from cloudinit.net import network_state
from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
@@ -64,7 +66,7 @@ NETWORK_DATA = {
'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
{'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc',
'ethernet_mac_address': 'fa:16:3e:05:30:fe',
- 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'}
+ 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'}
],
'networks': [
{'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp',
@@ -79,6 +81,35 @@ NETWORK_DATA = {
]
}
+NETWORK_DATA_2 = {
+ "services": [
+ {"type": "dns", "address": "1.1.1.191"},
+ {"type": "dns", "address": "1.1.1.4"}],
+ "networks": [
+ {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4",
+ "netmask": "255.255.255.248", "link": "eth0",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "2.2.2.9"}],
+ "ip_address": "2.2.2.10", "id": "network0-ipv4"},
+ {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4",
+ "netmask": "255.255.255.224", "link": "eth1",
+ "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}],
+ "links": [
+ {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500,
+ "type": "vif", "id": "eth0", "vif_id": "vif-foo1"},
+ {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500,
+ "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}]
+}
+
+
+KNOWN_MACS = {
+ 'fa:16:3e:69:b0:58': 'enp0s1',
+ 'fa:16:3e:d4:57:ad': 'enp0s2',
+ 'fa:16:3e:dd:50:9a': 'foo1',
+ 'fa:16:3e:a8:14:69': 'foo2',
+ 'fa:16:3e:ed:9a:59': 'foo3',
+}
+
CFG_DRIVE_FILES_V2 = {
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
'ec2/2009-04-04/user-data': USER_DATA,
@@ -358,13 +389,14 @@ class TestNetJson(TestCase):
"""Verify that network_data is present in ds in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- self.assertEqual(myds.network_json, NETWORK_DATA)
+ self.assertIsNotNone(myds.network_json)
def test_network_config_is_converted(self):
"""Verify that network_data is converted and present on ds object."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- network_config = openstack.convert_net_json(NETWORK_DATA)
+ network_config = openstack.convert_net_json(NETWORK_DATA,
+ known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
def test_network_config_conversions(self):
@@ -404,19 +436,22 @@ class TestNetJson(TestCase):
'subnets': [{'type': 'dhcp4'}],
'type': 'physical',
'mac_address': 'fa:16:3e:69:b0:58',
- 'name': 'tap2ecc7709-b3',
+ 'name': 'enp0s1',
+ 'mtu': None,
},
{
'subnets': [{'type': 'dhcp4'}],
'type': 'physical',
'mac_address': 'fa:16:3e:d4:57:ad',
- 'name': 'tap2f88d109-5b',
+ 'name': 'enp0s2',
+ 'mtu': None,
},
{
'subnets': [{'type': 'dhcp4'}],
'type': 'physical',
'mac_address': 'fa:16:3e:05:30:fe',
- 'name': 'tap1a5382f8-04',
+ 'name': 'nic0',
+ 'mtu': None,
},
{
'type': 'nameserver',
@@ -433,10 +468,10 @@ class TestNetJson(TestCase):
'version': 1,
'config': [
{
- 'name': 'tap1a81968a-79',
+ 'name': 'foo3',
'mac_address': 'fa:16:3e:ed:9a:59',
'mtu': None,
- 'type': 'bridge',
+ 'type': 'physical',
'subnets': [
{
'address': '172.19.1.34',
@@ -459,20 +494,90 @@ class TestNetJson(TestCase):
},
]
for in_data, out_data in zip(in_datas, out_datas):
- self.assertEqual(openstack.convert_net_json(in_data),
- out_data)
+ conv_data = openstack.convert_net_json(in_data,
+ known_macs=KNOWN_MACS)
+ self.assertEqual(out_data, conv_data)
+
+
+class TestConvertNetworkData(TestCase):
+ def setUp(self):
+ super(TestConvertNetworkData, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def _getnames_in_config(self, ncfg):
+ return set([n['name'] for n in ncfg['config']
+ if n['type'] == 'physical'])
+
+ def test_conversion_fills_names(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
+ expected = set(['nic0', 'enp0s1', 'enp0s2'])
+ found = self._getnames_in_config(ncfg)
+ self.assertEqual(found, expected)
+
+ @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac):
+ macs = KNOWN_MACS.copy()
+ macs.update({'fa:16:3e:05:30:fe': 'foonic1',
+ 'fa:16:3e:69:b0:58': 'ens1'})
+ get_interfaces_by_mac.return_value = macs
+
+ ncfg = openstack.convert_net_json(NETWORK_DATA)
+ expected = set(['nic0', 'ens1', 'enp0s2'])
+ found = self._getnames_in_config(ncfg)
+ self.assertEqual(found, expected)
+
+ def test_convert_raises_value_error_on_missing_name(self):
+ macs = {'aa:aa:aa:aa:aa:00': 'ens1'}
+ self.assertRaises(ValueError, openstack.convert_net_json,
+ NETWORK_DATA, known_macs=macs)
+
+ def test_conversion_with_route(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA_2,
+ known_macs=KNOWN_MACS)
+ # not the best test, but see that we get a route in the
+ # network config and that it gets rendered to an ENI file
+ routes = []
+ for n in ncfg['config']:
+ for s in n.get('subnets', []):
+ routes.extend(s.get('routes', []))
+ self.assertIn(
+ {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'},
+ routes)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+ self.assertIn("route add default gw 2.2.2.9", eni_rendering)
def cfg_ds_from_dir(seed_d):
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
helpers.Paths({}))
cfg_ds.seed_dir = seed_d
+ cfg_ds.known_macs = KNOWN_MACS.copy()
if not cfg_ds.get_data(skip_first_boot=True):
raise RuntimeError("Data source did not extract itself from"
" seed directory %s" % seed_d)
return cfg_ds
+def populate_ds_from_read_config(cfg_ds, source, results):
+ """Patch the DataSourceConfigDrive from the results of
+ read_config_drive_dir hopefully in line with what it would have
+ if cfg_ds.get_data had been successfully called"""
+ cfg_ds.source = source
+ cfg_ds.metadata = results.get('metadata')
+ cfg_ds.ec2_metadata = results.get('ec2-metadata')
+ cfg_ds.userdata_raw = results.get('userdata')
+ cfg_ds.version = results.get('version')
+ cfg_ds.network_json = results.get('networkdata')
+ cfg_ds._network_config = openstack.convert_net_json(
+ cfg_ds.network_json, known_macs=KNOWN_MACS)
+
+
def populate_dir(seed_dir, files):
for (name, content) in files.items():
path = os.path.join(seed_dir, name)
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 4140d054..5c8592c5 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -135,13 +135,17 @@ def _register_uris(version, ec2_files, ec2_meta, os_files):
body=get_request_callback)
+def _read_metadata_service():
+ return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1)
+
+
class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
VERSION = 'latest'
@hp.activate
def test_successful(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
- f = ds.read_metadata_service(BASE_URL)
+ f = _read_metadata_service()
self.assertEqual(VENDOR_DATA, f.get('vendordata'))
self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -163,7 +167,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
@hp.activate
def test_no_ec2(self):
_register_uris(self.VERSION, {}, {}, OS_FILES)
- f = ds.read_metadata_service(BASE_URL)
+ f = _read_metadata_service()
self.assertEqual(VENDOR_DATA, f.get('vendordata'))
self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -178,8 +182,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.NonReadable, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.NonReadable, _read_metadata_service)
@hp.activate
def test_bad_uuid(self):
@@ -190,8 +193,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = json.dumps(os_meta)
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_userdata_empty(self):
@@ -200,7 +202,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('user_data'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL)
+ f = _read_metadata_service()
self.assertEqual(VENDOR_DATA, f.get('vendordata'))
self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -213,7 +215,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL)
+ f = _read_metadata_service()
self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('vendordata'))
@@ -225,8 +227,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_metadata_invalid(self):
@@ -235,8 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_datasource(self):
@@ -245,7 +245,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertTrue(found)
self.assertEqual(2, ds_os.version)
md = dict(ds_os.metadata)
@@ -269,7 +269,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
@@ -288,7 +288,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
'timeout': 0,
}
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
@@ -311,7 +311,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
'timeout': 0,
}
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 28f56039..ddbb8fbd 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -25,6 +25,7 @@
from __future__ import print_function
from binascii import crc32
+import json
import os
import os.path
import re
@@ -41,8 +42,49 @@ import six
from cloudinit import helpers as c_helpers
from cloudinit.util import b64e
-from .. import helpers
-from ..helpers import mock
+from ..helpers import mock, FilesystemMockingTestCase, TestCase
+
+SDC_NICS = json.loads("""
+[
+ {
+ "nic_tag": "external",
+ "primary": true,
+ "mtu": 1500,
+ "model": "virtio",
+ "gateway": "8.12.42.1",
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.102",
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "vlan_id": 324,
+ "mac": "90:b8:d0:f5:e4:f5",
+ "interface": "net0",
+ "ips": [
+ "8.12.42.102/24"
+ ]
+ },
+ {
+ "nic_tag": "sdc_overlay/16187209",
+ "gateway": "192.168.128.1",
+ "model": "virtio",
+ "mac": "90:b8:d0:a5:ff:cd",
+ "netmask": "255.255.252.0",
+ "ip": "192.168.128.93",
+ "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9",
+ "gateways": [
+ "192.168.128.1"
+ ],
+ "vlan_id": 2,
+ "mtu": 8500,
+ "interface": "net1",
+ "ips": [
+ "192.168.128.93/22"
+ ]
+ }
+]
+""")
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -57,79 +99,66 @@ MOCK_RETURNS = {
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
+ 'sdc:nics': json.dumps(SDC_NICS),
}
DMI_DATA_RETURN = 'smartdc'
-def get_mock_client(mockdata):
- class MockMetadataClient(object):
+class PsuedoJoyentClient(object):
+ def __init__(self, data=None):
+ if data is None:
+ data = MOCK_RETURNS.copy()
+ self.data = data
+ return
- def __init__(self, serial):
- pass
+ def get(self, key, default=None, strip=False):
+ if key in self.data:
+ r = self.data[key]
+ if strip:
+ r = r.strip()
+ else:
+ r = default
+ return r
- def get_metadata(self, metadata_key):
- return mockdata.get(metadata_key)
- return MockMetadataClient
+ def get_json(self, key, default=None):
+ result = self.get(key, default=default)
+ if result is None:
+ return default
+ return json.loads(result)
+ def exists(self):
+ return True
-class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
+
+class TestSmartOSDataSource(FilesystemMockingTestCase):
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
+ dsmos = 'cloudinit.sources.DataSourceSmartOS'
+ patcher = mock.patch(dsmos + ".jmc_client_factory")
+ self.jmc_cfact = patcher.start()
+ self.addCleanup(patcher.stop)
+ patcher = mock.patch(dsmos + ".get_smartos_environ")
+ self.get_smartos_environ = patcher.start()
+ self.addCleanup(patcher.stop)
+
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
- self.legacy_user_d = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.legacy_user_d)
-
- # If you should want to watch the logs...
- self._log = None
- self._log_file = None
- self._log_handler = None
-
- # patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
- self.unapply = []
- super(TestSmartOSDataSource, self).setUp()
+ self.legacy_user_d = tempfile.mkdtemp()
+ self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
+ DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
- if self._log_handler and self._log:
- self._log.removeHandler(self._log_handler)
- apply_patches([i for i in reversed(self.unapply)])
+ DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
super(TestSmartOSDataSource, self).tearDown()
- def _patchIn(self, root):
- self.restore()
- self.patchOS(root)
- self.patchUtils(root)
-
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None,
- is_lxbrand=False):
- mod = DataSourceSmartOS
-
- if mockdata is None:
- mockdata = MOCK_RETURNS
-
- if dmi_data is None:
- dmi_data = DMI_DATA_RETURN
-
- def _dmi_data():
- return dmi_data
-
- def _os_uname():
- if not is_lxbrand:
- # LP: #1243287. tests assume this runs, but running test on
- # arm would cause them all to fail.
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
- else:
- return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX',
- 'X86_64')
+ def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
+ sys_cfg=None, ds_cfg=None):
+ self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
+ self.get_smartos_environ.return_value = mode
if sys_cfg is None:
sys_cfg = {}
@@ -138,44 +167,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource'] = sys_cfg.get('datasource', {})
sys_cfg['datasource']['SmartOS'] = ds_cfg
- self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([
- (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
- self.apply_patches([(mod, 'dmi_data', _dmi_data)])
- self.apply_patches([(os, 'uname', _os_uname)])
- self.apply_patches([(mod, 'device_exists', lambda d: True)])
- dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
- paths=self.paths)
- self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])
- return dsrc
-
- def test_seed(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual('kvm', dsrc.smartos_type)
- self.assertEqual('/dev/ttyS1', dsrc.seed)
-
- def test_seed_lxbrand(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual('lx-brand', dsrc.smartos_type)
- self.assertEqual('/native/.zonecontrol/metadata.sock', dsrc.seed)
-
- def test_issmartdc(self):
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
-
- def test_issmartdc_lxbrand(self):
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
+ return DataSourceSmartOS.DataSourceSmartOS(
+ sys_cfg, distro=None, paths=self.paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -211,58 +204,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.assertEqual(MOCK_RETURNS['hostname'],
dsrc.metadata['local-hostname'])
- def test_base64_all(self):
- # metadata provided base64_all of true
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_all'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
- self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
- self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
-
- def test_b64_userdata(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['b64-cloud-init:user-data'] = "true"
- my_returns['b64-hostname'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
-
- def test_b64_keys(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_keys'] = 'hostname,ignored'
- for k in ('hostname',):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
-
def test_userdata(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
@@ -272,6 +213,13 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
dsrc.userdata_raw)
+ def test_sdc_nics(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
+ dsrc.metadata['network-data'])
+
def test_sdc_scripts(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
@@ -427,18 +375,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
-class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+class TestJoyentMetadataClient(FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
@@ -479,7 +416,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
mock.Mock(return_value=self.request_id)))
def _get_client(self):
- return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+ return DataSourceSmartOS.JoyentMetadataClient(
+ fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
def assertEndsWith(self, haystack, prefix):
self.assertTrue(haystack.endswith(prefix),
@@ -493,7 +431,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_writes_a_single_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
print(type(written_line))
@@ -503,7 +441,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def _get_written_line(self, key='some_key'):
client = self._get_client()
- client.get_metadata(key)
+ client.get(key)
return self.serial.write.call_args[0][0]
def test_get_metadata_writes_bytes(self):
@@ -547,32 +485,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_reads_a_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
- value = client.get_metadata('some_key')
+ value = client.get('some_key')
self.assertEqual(self.metadata_value, value)
def test_get_metadata_throws_exception_for_incorrect_length(self):
self.response_parts['length'] = 0
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_incorrect_crc(self):
self.response_parts['crc'] = 'deadbeef'
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_request_id_mismatch(self):
self.response_parts['request_id'] = 'deadbeef'
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_returns_None_if_value_not_found(self):
self.response_parts['payload'] = ''
@@ -580,4 +518,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
self.response_parts['length'] = 17
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
- self.assertIsNone(client.get_metadata('some_key'))
+ self.assertIsNone(client.get('some_key'))
+
+
+class TestNetworkConversion(TestCase):
+
+ def test_convert_simple(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'netmask': '255.255.255.0',
+ 'address': '8.12.42.102/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '192.168.128.1',
+ 'netmask': '255.255.252.0',
+ 'address': '192.168.128.93/22'}],
+ 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
+ found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS)
+ self.assertEqual(expected, found)
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
new file mode 100644
index 00000000..5d0417a2
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
@@ -0,0 +1,165 @@
+""" test_handler_apt_configure_sources_list
+Test templating of sources list
+"""
+import logging
+import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import templater
+from cloudinit import util
+
+from cloudinit.config import cc_apt_configure
+from cloudinit.sources import DataSourceNone
+
+from .. import helpers as t_help
+
+LOG = logging.getLogger(__name__)
+
+YAML_TEXT_CUSTOM_SL = """
+apt_mirror: http://archive.ubuntu.com/ubuntu/
+apt_custom_sources_list: |
+ ## template:jinja
+ ## Note, this file is written by cloud-init on first boot of an instance
+ ## modifications made here will not survive a re-bundle.
+ ## if you wish to make changes you can:
+ ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
+ ## or do the same in user-data
+ ## b.) add sources in /etc/apt/sources.list.d
+ ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
+
+ # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+ # newer versions of the distribution.
+ deb {{mirror}} {{codename}} main restricted
+ deb-src {{mirror}} {{codename}} main restricted
+ # FIND_SOMETHING_SPECIAL
+"""
+
+EXPECTED_CONVERTED_CONTENT = (
+ """## Note, this file is written by cloud-init on first boot of an instance
+## modifications made here will not survive a re-bundle.
+## if you wish to make changes you can:
+## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
+## or do the same in user-data
+## b.) add sources in /etc/apt/sources.list.d
+## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
+
+# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+# newer versions of the distribution.
+deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
+deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
+# FIND_SOMETHING_SPECIAL
+""")
+
+
+def load_tfile_or_url(*args, **kwargs):
+ """load_tfile_or_url
+ load file and return content after decoding
+ """
+ return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+
+
+class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
+ """TestAptSourceConfigSourceList
+ Main Class to test sources list rendering
+ """
+ def setUp(self):
+ super(TestAptSourceConfigSourceList, self).setUp()
+ self.subp = util.subp
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def apt_source_list(self, distro, mirror, mirrorcheck=None):
+ """apt_source_list
+ Test rendering of a source.list from template for a given distro
+ """
+ if mirrorcheck is None:
+ mirrorcheck = mirror
+
+ if isinstance(mirror, list):
+ cfg = {'apt_mirror_search': mirror}
+ else:
+ cfg = {'apt_mirror': mirror}
+ mycloud = self._get_cloud(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile',
+ return_value=True) as mockisfile:
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
+
+ mockisfile.assert_any_call(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro),
+ '/etc/apt/sources.list',
+ {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck})
+
+ def test_apt_source_list_debian(self):
+ """test_apt_source_list_debian
+ Test rendering of a source.list from template for debian
+ """
+ self.apt_source_list('debian', 'http://httpredir.debian.org/debian')
+
+ def test_apt_source_list_ubuntu(self):
+ """test_apt_source_list_ubuntu
+ Test rendering of a source.list from template for ubuntu
+ """
+ self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/')
+
+ def test_apt_srcl_debian_mirrorfail(self):
+ """test_apt_source_list_debian_mirrorfail
+ Test rendering of a source.list from template for debian
+ """
+ self.apt_source_list('debian', ['http://does.not.exist',
+ 'http://httpredir.debian.org/debian'],
+ 'http://httpredir.debian.org/debian')
+
+ def test_apt_srcl_ubuntu_mirrorfail(self):
+ """test_apt_source_list_ubuntu_mirrorfail
+ Test rendering of a source.list from template for ubuntu
+ """
+ self.apt_source_list('ubuntu', ['http://does.not.exist',
+ 'http://archive.ubuntu.com/ubuntu/'],
+ 'http://archive.ubuntu.com/ubuntu/')
+
+ def test_apt_srcl_custom(self):
+ """test_apt_srcl_custom
+ Test rendering from a custom source.list template
+ """
+ cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL)
+ mycloud = self._get_cloud('ubuntu')
+
+ # the second mock restores the original subp
+ with mock.patch.object(util, 'write_file') as mockwrite:
+ with mock.patch.object(util, 'subp', self.subp):
+ with mock.patch.object(cc_apt_configure, 'get_release',
+ return_value='fakerelease'):
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
+
+ mockwrite.assert_called_once_with(
+ '/etc/apt/sources.list',
+ EXPECTED_CONVERTED_CONTENT,
+ mode=420)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py
new file mode 100644
index 00000000..4dbe69f0
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_source.py
@@ -0,0 +1,551 @@
+""" test_handler_apt_source
+Testing various config variations of the apt_source config
+"""
+import os
+import re
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+from mock import call
+
+from cloudinit.config import cc_apt_configure
+from cloudinit import util
+
+from ..helpers import TestCase
+
+EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ
+NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy
+8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0
+HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG
+CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29
+OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ
+FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm
+S0ORP6HXET3+jC8BMG4tBWCTK/XEZw==
+=ACB2
+-----END PGP PUBLIC KEY BLOCK-----"""
+
+
+def load_tfile_or_url(*args, **kwargs):
+ """load_tfile_or_url
+ load file and return content after decoding
+ """
+ return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+
+
+class TestAptSourceConfig(TestCase):
+ """TestAptSourceConfig
+ Main Class to test apt_source configs
+ """
+ def setUp(self):
+ super(TestAptSourceConfig, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.aptlistfile = os.path.join(self.tmp, "single-deb.list")
+ self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list")
+ self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
+ self.join = os.path.join
+ # mock fallback filename into writable tmp dir
+ self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/",
+ "cloud_config_sources.list")
+
+ @staticmethod
+ def _get_default_params():
+ """get_default_params
+ Get the most basic default mrror and release info to be used in tests
+ """
+ params = {}
+ params['RELEASE'] = cc_apt_configure.get_release()
+ params['MIRROR'] = "http://archive.ubuntu.com/ubuntu"
+ return params
+
+ def myjoin(self, *args, **kwargs):
+ """myjoin - redir into writable tmpdir"""
+ if (args[0] == "/etc/apt/sources.list.d/" and
+ args[1] == "cloud_config_sources.list" and
+ len(args) == 2):
+ return self.join(self.tmp, args[0].lstrip("/"), args[1])
+ else:
+ return self.join(*args, **kwargs)
+
+ def apt_src_basic(self, filename, cfg):
+ """apt_src_basic
+ Test Fix deb source string, has to overwrite mirror conf in params
+ """
+ params = self._get_default_params()
+
+ cc_apt_configure.add_sources(cfg, params)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "karmic-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_basic(self):
+ """test_apt_src_basic
+ Test Fix deb source string, has to overwrite mirror conf in params.
+ Test with a filename provided in config.
+ """
+ cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile}
+ self.apt_src_basic(self.aptlistfile, [cfg])
+
+ def test_apt_src_basic_dict(self):
+ """test_apt_src_basic_dict
+ Test Fix deb source string, has to overwrite mirror conf in params.
+ Test with a filename provided in config.
+ Provided in a dictionary with filename being the key (new format)
+ """
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')}}
+ self.apt_src_basic(self.aptlistfile, cfg)
+
+ def apt_src_basic_tri(self, cfg):
+ """apt_src_basic_tri
+ Test Fix three deb source string, has to overwrite mirror conf in
+ params. Test with filenames provided in config.
+ generic part to check three files with different content
+ """
+ self.apt_src_basic(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "precise-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "lucid-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_basic_tri(self):
+ """test_apt_src_basic_tri
+ Test Fix three deb source string, has to overwrite mirror conf in
+ params. Test with filenames provided in config.
+ """
+ cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' precise-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' lucid-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile3}
+ self.apt_src_basic_tri([cfg1, cfg2, cfg3])
+
+ def test_apt_src_basic_dict_tri(self):
+ """test_apt_src_basic_dict_tri
+ Test Fix three deb source string, has to overwrite mirror conf in
+ params. Test with filenames provided in config.
+ Provided in a dictionary with filename being the key (new format)
+ """
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile2: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' precise-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile3: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' lucid-backports'
+ ' main universe multiverse restricted')}}
+ self.apt_src_basic_tri(cfg)
+
+ def test_apt_src_basic_nofn(self):
+ """test_apt_src_basic_nofn
+ Test Fix deb source string, has to overwrite mirror conf in params.
+ Test without a filename provided in config and test for known fallback.
+ """
+ cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_basic(self.fallbackfn, [cfg])
+
+ def apt_src_replacement(self, filename, cfg):
+ """apt_src_replace
+ Test Autoreplacement of MIRROR and RELEASE in source specs
+ """
+ params = self._get_default_params()
+ cc_apt_configure.add_sources(cfg, params)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_replace(self):
+ """test_apt_src_replace
+ Test Autoreplacement of MIRROR and RELEASE in source specs with
+ Filename being set
+ """
+ cfg = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ self.apt_src_replacement(self.aptlistfile, [cfg])
+
+ def apt_src_replace_tri(self, cfg):
+ """apt_src_replace_tri
+ Test three autoreplacements of MIRROR and RELEASE in source specs with
+ generic part
+ """
+ self.apt_src_replacement(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ params = self._get_default_params()
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "main"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "universe"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_replace_tri(self):
+ """test_apt_src_replace_tri
+ Test three autoreplacements of MIRROR and RELEASE in source specs with
+ Filename being set
+ """
+ cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
+ 'filename': self.aptlistfile3}
+ self.apt_src_replace_tri([cfg1, cfg2, cfg3])
+
+ def test_apt_src_replace_dict_tri(self):
+ """test_apt_src_replace_dict_tri
+ Test three autoreplacements of MIRROR and RELEASE in source specs with
+ Filename being set
+ Provided in a dictionary with filename being the key (new format)
+ We also test a new special conditions of the new format that allows
+ filenames to be overwritten inside the directory entry.
+ """
+ cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'},
+ 'notused': {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2},
+ self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}}
+ self.apt_src_replace_tri(cfg)
+
+ def test_apt_src_replace_nofn(self):
+ """test_apt_src_replace_nofn
+ Test Autoreplacement of MIRROR and RELEASE in source specs with
+ No filename being set
+ """
+ cfg = {'source': 'deb $MIRROR $RELEASE multiverse'}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_replacement(self.fallbackfn, [cfg])
+
+ def apt_src_keyid(self, filename, cfg, keynum):
+ """apt_src_keyid
+ Test specification of a source + keyid
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(util, 'subp',
+ return_value=('fakekey 1234', '')) as mockobj:
+ cc_apt_configure.add_sources(cfg, params)
+
+ # check if it added the right ammount of keys
+ calls = []
+ for _ in range(keynum):
+ calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234'))
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_keyid(self):
+ """test_apt_src_keyid
+ Test specification of a source + keyid with filename being set
+ """
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+ self.apt_src_keyid(self.aptlistfile, [cfg], 1)
+
+ def test_apt_src_keyid_tri(self):
+ """test_apt_src_keyid_tri
+ Test specification of a source + keyid with filename being set
+ Setting three of such, check for content and keys
+ """
+ cfg1 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial universe'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial multiverse'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile3}
+
+ self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3)
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "universe"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_keyid_nofn(self):
+ """test_apt_src_keyid_nofn
+ Test specification of a source + keyid without filename being set
+ """
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77"}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_keyid(self.fallbackfn, [cfg], 1)
+
+ def apt_src_key(self, filename, cfg):
+ """apt_src_key
+ Test specification of a source + key
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_sources([cfg], params)
+
+ mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321')
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_key(self):
+ """test_apt_src_key
+ Test specification of a source + key with filename being set
+ """
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'key': "fakekey 4321",
+ 'filename': self.aptlistfile}
+ self.apt_src_key(self.aptlistfile, cfg)
+
+ def test_apt_src_key_nofn(self):
+ """test_apt_src_key_nofn
+ Test specification of a source + key without filename being set
+ """
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'key': "fakekey 4321"}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_key(self.fallbackfn, cfg)
+
+ def test_apt_src_keyonly(self):
+ """test_apt_src_keyonly
+ Test specification key without source
+ """
+ params = self._get_default_params()
+ cfg = {'key': "fakekey 4242",
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_sources([cfg], params)
+
+ mockobj.assert_called_once_with(('apt-key', 'add', '-'),
+ 'fakekey 4242')
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_keyidonly(self):
+ """test_apt_src_keyidonly
+ Test specification of a keyid without source
+ """
+ params = self._get_default_params()
+ cfg = {'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(util, 'subp',
+ return_value=('fakekey 1212', '')) as mockobj:
+ cc_apt_configure.add_sources([cfg], params)
+
+ mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212')
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_keyid_real(self):
+ """test_apt_src_keyid_real
+ Test specification of a keyid without source incl
+ up to addition of the key (nothing but add_key_raw mocked)
+ """
+ keyid = "03683F77"
+ params = self._get_default_params()
+ cfg = {'keyid': keyid,
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(cc_apt_configure, 'add_key_raw') as mockobj:
+ cc_apt_configure.add_sources([cfg], params)
+
+ mockobj.assert_called_with(EXPECTEDKEY)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_longkeyid_real(self):
+ """test_apt_src_longkeyid_real
+ Test specification of a long key fingerprint without source incl
+ up to addition of the key (nothing but add_key_raw mocked)
+ """
+ keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
+ params = self._get_default_params()
+ cfg = {'keyid': keyid,
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(cc_apt_configure, 'add_key_raw') as mockobj:
+ cc_apt_configure.add_sources([cfg], params)
+
+ mockobj.assert_called_with(EXPECTEDKEY)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_ppa(self):
+ """test_apt_src_ppa
+ Test specification of a ppa
+ """
+ params = self._get_default_params()
+ cfg = {'source': 'ppa:smoser/cloud-init-test',
+ 'filename': self.aptlistfile}
+
+ # default matcher needed for ppa
+ matcher = re.compile(r'^[\w-]+:\w').search
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_sources([cfg], params, aa_repo_match=matcher)
+ mockobj.assert_called_once_with(['add-apt-repository',
+ 'ppa:smoser/cloud-init-test'])
+
+ # adding ppa should ignore filename (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_ppa_tri(self):
+ """test_apt_src_ppa_tri
+ Test specification of a ppa
+ """
+ params = self._get_default_params()
+ cfg1 = {'source': 'ppa:smoser/cloud-init-test',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'ppa:smoser/cloud-init-test2',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'ppa:smoser/cloud-init-test3',
+ 'filename': self.aptlistfile3}
+
+ # default matcher needed for ppa
+ matcher = re.compile(r'^[\w-]+:\w').search
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_sources([cfg1, cfg2, cfg3], params,
+ aa_repo_match=matcher)
+ calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])]
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ # adding ppa should ignore all filenames (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+ self.assertFalse(os.path.isfile(self.aptlistfile2))
+ self.assertFalse(os.path.isfile(self.aptlistfile3))
+
+ def test_convert_to_new_format(self):
+ """test_convert_to_new_format
+ Test the conversion of old to new format
+ And the noop conversion of new to new format as well
+ """
+ cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
+ 'filename': self.aptlistfile3}
+ checkcfg = {self.aptlistfile: {'filename': self.aptlistfile,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'multiverse'},
+ self.aptlistfile2: {'filename': self.aptlistfile2,
+ 'source': 'deb $MIRROR $RELEASE main'},
+ self.aptlistfile3: {'filename': self.aptlistfile3,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'universe'}}
+
+ newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3])
+ self.assertEqual(newcfg, checkcfg)
+
+ newcfg2 = cc_apt_configure.convert_to_new_format(newcfg)
+ self.assertEqual(newcfg2, checkcfg)
+
+ with self.assertRaises(ValueError):
+ cc_apt_configure.convert_to_new_format(5)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 681f3780..a33ec184 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -125,9 +125,9 @@ class TestSimpleRun(helpers.ResourceUsingTestCase):
def test_seed_runs(self):
test_dicts = []
- for i in range(1, 50):
+ for i in range(1, 10):
base_dicts = []
- for j in range(1, 50):
+ for j in range(1, 10):
base_dicts.append(make_dict(5, i * j))
test_dicts.append(base_dicts)
for test in test_dicts: