summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py97
-rw-r--r--tests/unittests/test__init__.py17
-rw-r--r--tests/unittests/test_cli.py46
-rw-r--r--tests/unittests/test_cs_util.py27
-rw-r--r--tests/unittests/test_datasource/test_azure.py12
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py13
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py2
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py11
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py203
-rw-r--r--tests/unittests/test_datasource/test_gce.py2
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py15
-rw-r--r--tests/unittests/test_datasource/test_smartos.py10
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list.py180
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source.py516
-rw-r--r--tests/unittests/test_net.py277
-rw-r--r--tests/unittests/test_reporting.py4
-rw-r--r--tests/unittests/test_rh_subscription.py22
17 files changed, 1206 insertions, 248 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 50b2bd72..972245df 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -7,13 +7,11 @@ import sys
import tempfile
import unittest
+import mock
import six
+import unittest2
try:
- from unittest import mock
-except ImportError:
- import mock
-try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
@@ -21,6 +19,9 @@ except ImportError:
from cloudinit import helpers as ch
from cloudinit import util
+# Used for skipping tests
+SkipTest = unittest2.SkipTest
+
# Used for detecting different python versions
PY2 = False
PY26 = False
@@ -44,78 +45,6 @@ else:
if _PY_MINOR == 4 and _PY_MICRO < 3:
FIX_HTTPRETTY = True
-if PY26:
- # For now add these on, taken from python 2.7 + slightly adjusted. Drop
- # all this once Python 2.6 is dropped as a minimum requirement.
- class TestCase(unittest.TestCase):
- def setUp(self):
- super(TestCase, self).setUp()
- self.__all_cleanups = ExitStack()
-
- def tearDown(self):
- self.__all_cleanups.close()
- unittest.TestCase.tearDown(self)
-
- def addCleanup(self, function, *args, **kws):
- self.__all_cleanups.callback(function, *args, **kws)
-
- def assertIs(self, expr1, expr2, msg=None):
- if expr1 is not expr2:
- standardMsg = '%r is not %r' % (expr1, expr2)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIn(self, member, container, msg=None):
- if member not in container:
- standardMsg = '%r not found in %r' % (member, container)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertNotIn(self, member, container, msg=None):
- if member in container:
- standardMsg = '%r unexpectedly found in %r'
- standardMsg = standardMsg % (member, container)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIsNone(self, value, msg=None):
- if value is not None:
- standardMsg = '%r is not None'
- standardMsg = standardMsg % (value)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIsInstance(self, obj, cls, msg=None):
- """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
- default message."""
- if not isinstance(obj, cls):
- standardMsg = '%s is not an instance of %r' % (repr(obj), cls)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertDictContainsSubset(self, expected, actual, msg=None):
- missing = []
- mismatched = []
- for k, v in expected.items():
- if k not in actual:
- missing.append(k)
- elif actual[k] != v:
- mismatched.append('%r, expected: %r, actual: %r'
- % (k, v, actual[k]))
-
- if len(missing) == 0 and len(mismatched) == 0:
- return
-
- standardMsg = ''
- if missing:
- standardMsg = 'Missing: %r' % ','.join(m for m in missing)
- if mismatched:
- if standardMsg:
- standardMsg += '; '
- standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
-
- self.fail(self._formatMessage(msg, standardMsg))
-
-
-else:
- class TestCase(unittest.TestCase):
- pass
-
# Makes the old path start
# with new base instead of whatever
@@ -151,6 +80,10 @@ def retarget_many_wrapper(new_base, am, old_func):
return wrapper
+class TestCase(unittest2.TestCase):
+ pass
+
+
class ResourceUsingTestCase(TestCase):
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
@@ -331,6 +264,18 @@ def populate_dir(path, files):
fp.close()
+def dir2dict(startdir, prefix=None):
+ flist = {}
+ if prefix is None:
+ prefix = startdir
+ for root, dirs, files in os.walk(startdir):
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ key = fpath[len(prefix):]
+ flist[key] = util.load_file(fpath)
+ return flist
+
+
try:
skipIf = unittest.skipIf
except AttributeError:
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 153f1658..0154784a 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,16 +1,6 @@
import os
import shutil
import tempfile
-import unittest
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
from cloudinit import handlers
from cloudinit import helpers
@@ -18,7 +8,7 @@ from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
-from .helpers import TestCase
+from .helpers import TestCase, ExitStack, mock
class FakeModule(handlers.Handler):
@@ -99,9 +89,10 @@ class TestWalkerHandleHandler(TestCase):
self.assertEqual(self.data['handlercount'], 0)
-class TestHandlerHandlePart(unittest.TestCase):
+class TestHandlerHandlePart(TestCase):
def setUp(self):
+ super(TestHandlerHandlePart, self).setUp()
self.data = "fake data"
self.ctype = "fake ctype"
self.filename = "fake filename"
@@ -177,7 +168,7 @@ class TestHandlerHandlePart(unittest.TestCase):
self.data, self.ctype, self.filename, self.payload)
-class TestCmdlineUrl(unittest.TestCase):
+class TestCmdlineUrl(TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
index f537bd83..5fa252f7 100644
--- a/tests/unittests/test_cli.py
+++ b/tests/unittests/test_cli.py
@@ -1,17 +1,10 @@
-import imp
-import os
import six
-import sys
from . import helpers as test_helpers
-try:
- from unittest import mock
-except ImportError:
- import mock
+from cloudinit.cmd import main as cli
-
-BIN_CLOUDINIT = "bin/cloud-init"
+mock = test_helpers.mock
class TestCLI(test_helpers.FilesystemMockingTestCase):
@@ -20,35 +13,22 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
super(TestCLI, self).setUp()
self.stderr = six.StringIO()
self.patchStdoutAndStderr(stderr=self.stderr)
- self.sys_exit = mock.MagicMock()
- self.patched_funcs.enter_context(
- mock.patch.object(sys, 'exit', self.sys_exit))
-
- def _call_main(self):
- self.patched_funcs.enter_context(
- mock.patch.object(sys, 'argv', ['cloud-init']))
- cli = imp.load_module(
- 'cli', open(BIN_CLOUDINIT), '', ('', 'r', imp.PY_SOURCE))
+
+ def _call_main(self, sysv_args=None):
+ if not sysv_args:
+ sysv_args = ['cloud-init']
try:
- return cli.main()
- except Exception:
- pass
+ return cli.main(sysv_args=sysv_args)
+ except SystemExit as e:
+ return e.code
- @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
def test_no_arguments_shows_usage(self):
- self._call_main()
- self.assertIn('usage: cloud-init', self.stderr.getvalue())
-
- @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
- def test_no_arguments_exits_2(self):
exit_code = self._call_main()
- if self.sys_exit.call_count:
- self.assertEqual(mock.call(2), self.sys_exit.call_args)
- else:
- self.assertEqual(2, exit_code)
+ self.assertIn('usage: cloud-init', self.stderr.getvalue())
+ self.assertEqual(2, exit_code)
- @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit")
def test_no_arguments_shows_error_message(self):
- self._call_main()
+ exit_code = self._call_main()
self.assertIn('cloud-init: error: too few arguments',
self.stderr.getvalue())
+ self.assertEqual(2, exit_code)
diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py
index d7273035..56c9ce9e 100644
--- a/tests/unittests/test_cs_util.py
+++ b/tests/unittests/test_cs_util.py
@@ -1,21 +1,9 @@
from __future__ import print_function
-import sys
-import unittest
+from . import helpers as test_helpers
from cloudinit.cs_utils import Cepko
-try:
- skip = unittest.skip
-except AttributeError:
- # Python 2.6. Doesn't have to be high fidelity.
- def skip(reason):
- def decorator(func):
- def wrapper(*args, **kws):
- print(reason, file=sys.stderr)
- return wrapper
- return decorator
-
SERVER_CONTEXT = {
"cpu": 1000,
@@ -43,18 +31,9 @@ class CepkoMock(Cepko):
# 2015-01-22 BAW: This test is completely useless because it only ever tests
# the CepkoMock object. Even in its original form, I don't think it ever
# touched the underlying Cepko class methods.
-@skip('This test is completely useless')
-class CepkoResultTests(unittest.TestCase):
+class CepkoResultTests(test_helpers.TestCase):
def setUp(self):
- pass
- # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
- # spec=CepkoMock,
- # count=False,
- # passthrough=False)
- # self.mocked()
- # self.mocker.result(CepkoMock())
- # self.mocker.replay()
- # self.c = Cepko()
+ raise test_helpers.SkipTest('This test is completely useless')
def test_getitem(self):
result = self.c.all()
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 5f3eb31f..e90e903c 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,16 +1,8 @@
from cloudinit import helpers
from cloudinit.util import b64e, decode_binary, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import TestCase, populate_dir
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
import crypt
import os
@@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase):
def setUp(self):
super(TestAzureDataSource, self).setUp()
+ if PY26:
+ raise SkipTest("Does not work on python 2.6")
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index d07a1f07..65202ff0 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -2,17 +2,7 @@ import os
from cloudinit.sources.helpers import azure as azure_helper
-from ..helpers import TestCase
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import ExitStack, mock, TestCase
GOAL_STATE_TEMPLATE = """\
@@ -288,6 +278,7 @@ class TestOpenSSLManager(TestCase):
self.subp.side_effect = capture_directory
manager = azure_helper.OpenSSLManager()
self.assertEqual(manager.tmpdir, subp_directory['path'])
+ manager.clean_up()
@mock.patch.object(azure_helper, 'cd', mock.MagicMock())
@mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 772d189a..2a42ce0c 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -1,4 +1,5 @@
# coding: utf-8
+
import copy
from cloudinit.cs_utils import Cepko
@@ -6,7 +7,6 @@ from cloudinit.sources import DataSourceCloudSigma
from .. import helpers as test_helpers
-
SERVER_CONTEXT = {
"cpu": 1000,
"cpus_instead_of_cores": False,
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index 974b3704..b1aab17b 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -1,16 +1,7 @@
from cloudinit import helpers
from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
-from ..helpers import TestCase
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, mock, ExitStack
class TestCloudStackPasswordFetching(TestCase):
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 1364b39d..18551b92 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -5,22 +5,15 @@ import shutil
import six
import tempfile
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
-
from cloudinit import helpers
+from cloudinit.net import eni
+from cloudinit.net import network_state
from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from ..helpers import TestCase
+from ..helpers import TestCase, ExitStack, mock
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -88,9 +81,34 @@ NETWORK_DATA = {
]
}
+NETWORK_DATA_2 = {
+ "services": [
+ {"type": "dns", "address": "1.1.1.191"},
+ {"type": "dns", "address": "1.1.1.4"}],
+ "networks": [
+ {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4",
+ "netmask": "255.255.255.248", "link": "eth0",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "2.2.2.9"}],
+ "ip_address": "2.2.2.10", "id": "network0-ipv4"},
+ {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4",
+ "netmask": "255.255.255.224", "link": "eth1",
+ "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}],
+ "links": [
+ {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500,
+ "type": "vif", "id": "eth0", "vif_id": "vif-foo1"},
+ {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500,
+ "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}]
+}
+
+
KNOWN_MACS = {
'fa:16:3e:69:b0:58': 'enp0s1',
- 'fa:16:3e:d4:57:ad': 'enp0s2'}
+ 'fa:16:3e:d4:57:ad': 'enp0s2',
+ 'fa:16:3e:dd:50:9a': 'foo1',
+ 'fa:16:3e:a8:14:69': 'foo2',
+ 'fa:16:3e:ed:9a:59': 'foo3',
+}
CFG_DRIVE_FILES_V2 = {
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
@@ -352,35 +370,150 @@ class TestConfigDriveDataSource(TestCase):
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
- def test_pubkeys_v2(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
- def test_network_data_is_found(self):
+
+class TestNetJson(TestCase):
+ def setUp(self):
+ super(TestNetJson, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.maxDiff = None
+
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- self.assertEqual(myds.network_json, NETWORK_DATA)
+ self.assertIsNotNone(myds.network_json)
- def test_network_config_is_converted(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- network_config = ds.convert_network_data(NETWORK_DATA,
- known_macs=KNOWN_MACS)
+ network_config = openstack.convert_net_json(NETWORK_DATA,
+ known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
+ def test_network_config_conversions(self):
+ """Tests a bunch of input network json and checks the
+ expected conversions."""
+ in_datas = [
+ NETWORK_DATA,
+ {
+ 'services': [{'type': 'dns', 'address': '172.19.0.12'}],
+ 'networks': [{
+ 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4',
+ 'type': 'ipv4',
+ 'netmask': '255.255.252.0',
+ 'link': 'tap1a81968a-79',
+ 'routes': [{
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ 'gateway': '172.19.3.254',
+ }],
+ 'ip_address': '172.19.1.34',
+ 'id': 'network0',
+ }],
+ 'links': [{
+ 'type': 'bridge',
+ 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f',
+ 'ethernet_mac_address': 'fa:16:3e:ed:9a:59',
+ 'id': 'tap1a81968a-79',
+ 'mtu': None,
+ }],
+ },
+ ]
+ out_datas = [
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:69:b0:58',
+ 'name': 'enp0s1',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:d4:57:ad',
+ 'name': 'enp0s2',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:05:30:fe',
+ 'name': 'nic0',
+ 'mtu': None,
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.44.24',
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.47.54',
+ }
+ ],
+
+ },
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'name': 'foo3',
+ 'mac_address': 'fa:16:3e:ed:9a:59',
+ 'mtu': None,
+ 'type': 'physical',
+ 'subnets': [
+ {
+ 'address': '172.19.1.34',
+ 'netmask': '255.255.252.0',
+ 'type': 'static',
+ 'ipv4': True,
+ 'routes': [{
+ 'gateway': '172.19.3.254',
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ }],
+ }
+ ]
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '172.19.0.12',
+ }
+ ],
+ },
+ ]
+ for in_data, out_data in zip(in_datas, out_datas):
+ conv_data = openstack.convert_net_json(in_data,
+ known_macs=KNOWN_MACS)
+ self.assertEqual(out_data, conv_data)
+
class TestConvertNetworkData(TestCase):
+ def setUp(self):
+ super(TestConvertNetworkData, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
def _getnames_in_config(self, ncfg):
return set([n['name'] for n in ncfg['config']
if n['type'] == 'physical'])
def test_conversion_fills_names(self):
- ncfg = ds.convert_network_data(NETWORK_DATA, known_macs=KNOWN_MACS)
+ ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
expected = set(['nic0', 'enp0s1', 'enp0s2'])
found = self._getnames_in_config(ncfg)
self.assertEqual(found, expected)
@@ -392,22 +525,45 @@ class TestConvertNetworkData(TestCase):
'fa:16:3e:69:b0:58': 'ens1'})
get_interfaces_by_mac.return_value = macs
- ncfg = ds.convert_network_data(NETWORK_DATA)
+ ncfg = openstack.convert_net_json(NETWORK_DATA)
expected = set(['nic0', 'ens1', 'enp0s2'])
found = self._getnames_in_config(ncfg)
self.assertEqual(found, expected)
def test_convert_raises_value_error_on_missing_name(self):
macs = {'aa:aa:aa:aa:aa:00': 'ens1'}
- self.assertRaises(ValueError, ds.convert_network_data,
+ self.assertRaises(ValueError, openstack.convert_net_json,
NETWORK_DATA, known_macs=macs)
+ def test_conversion_with_route(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA_2,
+ known_macs=KNOWN_MACS)
+ # not the best test, but see that we get a route in the
+ # network config and that it gets rendered to an ENI file
+ routes = []
+ for n in ncfg['config']:
+ for s in n.get('subnets', []):
+ routes.extend(s.get('routes', []))
+ self.assertIn(
+ {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'},
+ routes)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+ self.assertIn("route add default gw 2.2.2.9", eni_rendering)
+
def cfg_ds_from_dir(seed_d):
- found = ds.read_config_drive(seed_d)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
helpers.Paths({}))
- populate_ds_from_read_config(cfg_ds, seed_d, found)
+ cfg_ds.seed_dir = seed_d
+ cfg_ds.known_macs = KNOWN_MACS.copy()
+ if not cfg_ds.get_data():
+ raise RuntimeError("Data source did not extract itself from"
+ " seed directory %s" % seed_d)
return cfg_ds
@@ -421,7 +577,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.userdata_raw = results.get('userdata')
cfg_ds.version = results.get('version')
cfg_ds.network_json = results.get('networkdata')
- cfg_ds._network_config = ds.convert_network_data(
+ cfg_ds._network_config = openstack.convert_net_json(
cfg_ds.network_json, known_macs=KNOWN_MACS)
@@ -435,7 +591,6 @@ def populate_dir(seed_dir, files):
mode = "w"
else:
mode = "wb"
-
with open(path, mode) as fp:
fp.write(content)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 1f7eb99e..6e62a4d2 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -52,7 +52,7 @@ GCE_META_ENCODING = {
HEADERS = {'X-Google-Metadata-Request': 'True'}
MD_URL_RE = re.compile(
- r'http://metadata.google.internal./computeMetadata/v1/.*')
+ r'http://metadata.google.internal/computeMetadata/v1/.*')
def _set_mock_metadata(gce_meta=None):
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 3c528c23..b0fa1130 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,22 +1,13 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import TestCase, populate_dir
+from ..helpers import TestCase, populate_dir, mock, ExitStack
import os
import shutil
import tempfile
-import unittest
-import yaml
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+import yaml
class TestNoCloudDataSource(TestCase):
@@ -139,7 +130,7 @@ class TestNoCloudDataSource(TestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(unittest.TestCase):
+class TestParseCommandLineData(TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index c809117b..9c6c8768 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -34,11 +34,12 @@ import stat
import tempfile
import uuid
-import serial
+from cloudinit import serial
+from cloudinit.sources import DataSourceSmartOS
+
import six
from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
from ..helpers import mock, FilesystemMockingTestCase, TestCase
@@ -146,7 +147,9 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.addCleanup(shutil.rmtree, self.tmp)
self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
- self.legacy_user_d = tempfile.mkdtemp()
+ self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ os.mkdir(self.legacy_user_d)
+
self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
@@ -378,6 +381,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
+
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
new file mode 100644
index 00000000..acde0863
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
@@ -0,0 +1,180 @@
+""" test_handler_apt_configure_sources_list
+Test templating of sources list
+"""
+import logging
+import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import templater
+from cloudinit import util
+
+from cloudinit.config import cc_apt_configure
+from cloudinit.sources import DataSourceNone
+
+from cloudinit.distros.debian import Distro
+
+from .. import helpers as t_help
+
+LOG = logging.getLogger(__name__)
+
+YAML_TEXT_CUSTOM_SL = """
+apt_mirror: http://archive.ubuntu.com/ubuntu/
+apt_custom_sources_list: |
+ ## template:jinja
+ ## Note, this file is written by cloud-init on first boot of an instance
+ ## modifications made here will not survive a re-bundle.
+ ## if you wish to make changes you can:
+ ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
+ ## or do the same in user-data
+ ## b.) add sources in /etc/apt/sources.list.d
+ ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
+
+ # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+ # newer versions of the distribution.
+ deb {{mirror}} {{codename}} main restricted
+ deb-src {{mirror}} {{codename}} main restricted
+ # FIND_SOMETHING_SPECIAL
+"""
+
+EXPECTED_CONVERTED_CONTENT = (
+ """## Note, this file is written by cloud-init on first boot of an instance
+## modifications made here will not survive a re-bundle.
+## if you wish to make changes you can:
+## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
+## or do the same in user-data
+## b.) add sources in /etc/apt/sources.list.d
+## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
+
+# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
+# newer versions of the distribution.
+deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
+deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
+# FIND_SOMETHING_SPECIAL
+""")
+
+
+def load_tfile_or_url(*args, **kwargs):
+ """load_tfile_or_url
+ load file and return content after decoding
+ """
+ return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+
+
+class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
+ """TestAptSourceConfigSourceList
+ Main Class to test sources list rendering
+ """
+ def setUp(self):
+ super(TestAptSourceConfigSourceList, self).setUp()
+ self.subp = util.subp
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def apt_source_list(self, distro, mirror, mirrorcheck=None):
+ """apt_source_list
+ Test rendering of a source.list from template for a given distro
+ """
+ if mirrorcheck is None:
+ mirrorcheck = mirror
+
+ if isinstance(mirror, list):
+ cfg = {'apt_mirror_search': mirror}
+ else:
+ cfg = {'apt_mirror': mirror}
+ mycloud = self._get_cloud(distro)
+
+ with mock.patch.object(templater, 'render_to_file') as mocktmpl:
+ with mock.patch.object(os.path, 'isfile',
+ return_value=True) as mockisfile:
+ with mock.patch.object(util, 'rename'):
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
+
+ mockisfile.assert_any_call(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
+ mocktmpl.assert_called_once_with(
+ ('/etc/cloud/templates/sources.list.%s.tmpl' % distro),
+ '/etc/apt/sources.list',
+ {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck})
+
+ def test_apt_source_list_debian(self):
+ """Test rendering of a source.list from template for debian"""
+ self.apt_source_list('debian', 'http://httpredir.debian.org/debian')
+
+ def test_apt_source_list_ubuntu(self):
+ """Test rendering of a source.list from template for ubuntu"""
+ self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/')
+
+ @staticmethod
+ def myresolve(name):
+ """Fake util.is_resolvable for mirrorfail tests"""
+ if name == "does.not.exist":
+ print("Faking FAIL for '%s'" % name)
+ return False
+ else:
+ print("Faking SUCCESS for '%s'" % name)
+ return True
+
+ def test_apt_srcl_debian_mirrorfail(self):
+ """Test rendering of a source.list from template for debian"""
+ with mock.patch.object(util, 'is_resolvable',
+ side_effect=self.myresolve) as mockresolve:
+ self.apt_source_list('debian',
+ ['http://does.not.exist',
+ 'http://httpredir.debian.org/debian'],
+ 'http://httpredir.debian.org/debian')
+ mockresolve.assert_any_call("does.not.exist")
+ mockresolve.assert_any_call("httpredir.debian.org")
+
+ def test_apt_srcl_ubuntu_mirrorfail(self):
+ """Test rendering of a source.list from template for ubuntu"""
+ with mock.patch.object(util, 'is_resolvable',
+ side_effect=self.myresolve) as mockresolve:
+ self.apt_source_list('ubuntu',
+ ['http://does.not.exist',
+ 'http://archive.ubuntu.com/ubuntu/'],
+ 'http://archive.ubuntu.com/ubuntu/')
+ mockresolve.assert_any_call("does.not.exist")
+ mockresolve.assert_any_call("archive.ubuntu.com")
+
+ def test_apt_srcl_custom(self):
+ """Test rendering from a custom source.list template"""
+ cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL)
+ mycloud = self._get_cloud('ubuntu')
+
+ # the second mock restores the original subp
+ with mock.patch.object(util, 'write_file') as mockwrite:
+ with mock.patch.object(util, 'subp', self.subp):
+ with mock.patch.object(cc_apt_configure, 'get_release',
+ return_value='fakerelease'):
+ with mock.patch.object(Distro, 'get_primary_arch',
+ return_value='amd64'):
+ cc_apt_configure.handle("notimportant", cfg, mycloud,
+ LOG, None)
+
+ mockwrite.assert_called_once_with(
+ '/etc/apt/sources.list',
+ EXPECTED_CONVERTED_CONTENT,
+ mode=420)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py
new file mode 100644
index 00000000..99a4d860
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_source.py
@@ -0,0 +1,516 @@
+""" test_handler_apt_source
+Testing various config variations of the apt_source config
+"""
+import os
+import re
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+from mock import call
+
+from cloudinit.config import cc_apt_configure
+from cloudinit import gpg
+from cloudinit import util
+
+from ..helpers import TestCase
+
+EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ
+NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy
+8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0
+HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG
+CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29
+OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ
+FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm
+S0ORP6HXET3+jC8BMG4tBWCTK/XEZw==
+=ACB2
+-----END PGP PUBLIC KEY BLOCK-----"""
+
+
+def load_tfile_or_url(*args, **kwargs):
+ """load_tfile_or_url
+ load file and return content after decoding
+ """
+ return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
+
+
+class TestAptSourceConfig(TestCase):
+ """TestAptSourceConfig
+ Main Class to test apt_source configs
+ """
+ release = "fantastic"
+
+ def setUp(self):
+ super(TestAptSourceConfig, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.aptlistfile = os.path.join(self.tmp, "single-deb.list")
+ self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list")
+ self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
+ self.join = os.path.join
+ # mock fallback filename into writable tmp dir
+ self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/",
+ "cloud_config_sources.list")
+
+ patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release")
+ get_rel = patcher.start()
+ get_rel.return_value = self.release
+ self.addCleanup(patcher.stop)
+
+ @staticmethod
+ def _get_default_params():
+ """get_default_params
+ Get the most basic default mrror and release info to be used in tests
+ """
+ params = {}
+ params['RELEASE'] = cc_apt_configure.get_release()
+ params['MIRROR'] = "http://archive.ubuntu.com/ubuntu"
+ return params
+
+ def myjoin(self, *args, **kwargs):
+ """myjoin - redir into writable tmpdir"""
+ if (args[0] == "/etc/apt/sources.list.d/" and
+ args[1] == "cloud_config_sources.list" and
+ len(args) == 2):
+ return self.join(self.tmp, args[0].lstrip("/"), args[1])
+ else:
+ return self.join(*args, **kwargs)
+
+ def apt_src_basic(self, filename, cfg):
+ """apt_src_basic
+ Test Fix deb source string, has to overwrite mirror conf in params
+ """
+ params = self._get_default_params()
+
+ cc_apt_configure.add_apt_sources(cfg, params)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "karmic-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_basic(self):
+ """Test deb source string, overwrite mirror and filename"""
+ cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile}
+ self.apt_src_basic(self.aptlistfile, [cfg])
+
+ def test_apt_src_basic_dict(self):
+ """Test deb source string, overwrite mirror and filename (dict)"""
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')}}
+ self.apt_src_basic(self.aptlistfile, cfg)
+
+ def apt_src_basic_tri(self, cfg):
+ """apt_src_basic_tri
+ Test Fix three deb source string, has to overwrite mirror conf in
+ params. Test with filenames provided in config.
+ generic part to check three files with different content
+ """
+ self.apt_src_basic(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "precise-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", "http://archive.ubuntu.com/ubuntu",
+ "lucid-backports",
+ "main universe multiverse restricted"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_basic_tri(self):
+ """Test Fix three deb source string with filenames"""
+ cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' precise-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' lucid-backports'
+ ' main universe multiverse restricted'),
+ 'filename': self.aptlistfile3}
+ self.apt_src_basic_tri([cfg1, cfg2, cfg3])
+
+ def test_apt_src_basic_dict_tri(self):
+ """Test Fix three deb source string with filenames (dict)"""
+ cfg = {self.aptlistfile: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile2: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' precise-backports'
+ ' main universe multiverse restricted')},
+ self.aptlistfile3: {'source':
+ ('deb http://archive.ubuntu.com/ubuntu'
+ ' lucid-backports'
+ ' main universe multiverse restricted')}}
+ self.apt_src_basic_tri(cfg)
+
+ def test_apt_src_basic_nofn(self):
+ """Test Fix three deb source string without filenames (dict)"""
+ cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
+ ' karmic-backports'
+ ' main universe multiverse restricted')}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_basic(self.fallbackfn, [cfg])
+
+ def apt_src_replacement(self, filename, cfg):
+ """apt_src_replace
+ Test Autoreplacement of MIRROR and RELEASE in source specs
+ """
+ params = self._get_default_params()
+ cc_apt_configure.add_apt_sources(cfg, params)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_replace(self):
+ """Test Autoreplacement of MIRROR and RELEASE in source specs"""
+ cfg = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ self.apt_src_replacement(self.aptlistfile, [cfg])
+
+ def apt_src_replace_tri(self, cfg):
+ """apt_src_replace_tri
+ Test three autoreplacements of MIRROR and RELEASE in source specs with
+ generic part
+ """
+ self.apt_src_replacement(self.aptlistfile, cfg)
+
+ # extra verify on two extra files of this test
+ params = self._get_default_params()
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "main"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb", params['MIRROR'], params['RELEASE'],
+ "universe"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_replace_tri(self):
+ """Test triple Autoreplacement of MIRROR and RELEASE in source specs"""
+ cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
+ 'filename': self.aptlistfile3}
+ self.apt_src_replace_tri([cfg1, cfg2, cfg3])
+
+ def test_apt_src_replace_dict_tri(self):
+ """Test triple Autoreplacement in source specs (dict)"""
+ cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'},
+ 'notused': {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2},
+ self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}}
+ self.apt_src_replace_tri(cfg)
+
+ def test_apt_src_replace_nofn(self):
+ """Test Autoreplacement of MIRROR and RELEASE in source specs nofile"""
+ cfg = {'source': 'deb $MIRROR $RELEASE multiverse'}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_replacement(self.fallbackfn, [cfg])
+
+ def apt_src_keyid(self, filename, cfg, keynum):
+ """apt_src_keyid
+ Test specification of a source + keyid
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(util, 'subp',
+ return_value=('fakekey 1234', '')) as mockobj:
+ cc_apt_configure.add_apt_sources(cfg, params)
+
+ # check if it added the right ammount of keys
+ calls = []
+ for _ in range(keynum):
+ calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234'))
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_keyid(self):
+ """Test specification of a source + keyid with filename being set"""
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+ self.apt_src_keyid(self.aptlistfile, [cfg], 1)
+
+ def test_apt_src_keyid_tri(self):
+ """Test 3x specification of a source + keyid with filename being set"""
+ cfg1 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial universe'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial multiverse'),
+ 'keyid': "03683F77",
+ 'filename': self.aptlistfile3}
+
+ self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3)
+ contents = load_tfile_or_url(self.aptlistfile2)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "universe"),
+ contents, flags=re.IGNORECASE))
+ contents = load_tfile_or_url(self.aptlistfile3)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "multiverse"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_keyid_nofn(self):
+ """Test specification of a source + keyid without filename being set"""
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'keyid': "03683F77"}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_keyid(self.fallbackfn, [cfg], 1)
+
+ def apt_src_key(self, filename, cfg):
+ """apt_src_key
+ Test specification of a source + key
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_apt_sources([cfg], params)
+
+ mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321')
+
+ self.assertTrue(os.path.isfile(filename))
+
+ contents = load_tfile_or_url(filename)
+ self.assertTrue(re.search(r"%s %s %s %s\n" %
+ ("deb",
+ ('http://ppa.launchpad.net/smoser/'
+ 'cloud-init-test/ubuntu'),
+ "xenial", "main"),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_src_key(self):
+ """Test specification of a source + key with filename being set"""
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'key': "fakekey 4321",
+ 'filename': self.aptlistfile}
+ self.apt_src_key(self.aptlistfile, cfg)
+
+ def test_apt_src_key_nofn(self):
+ """Test specification of a source + key without filename being set"""
+ cfg = {'source': ('deb '
+ 'http://ppa.launchpad.net/'
+ 'smoser/cloud-init-test/ubuntu'
+ ' xenial main'),
+ 'key': "fakekey 4321"}
+ with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
+ self.apt_src_key(self.fallbackfn, cfg)
+
+ def test_apt_src_keyonly(self):
+ """Test specifying key without source"""
+ params = self._get_default_params()
+ cfg = {'key': "fakekey 4242",
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_apt_sources([cfg], params)
+
+ mockobj.assert_called_once_with(('apt-key', 'add', '-'),
+ 'fakekey 4242')
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_keyidonly(self):
+ """Test specification of a keyid without source"""
+ params = self._get_default_params()
+ cfg = {'keyid': "03683F77",
+ 'filename': self.aptlistfile}
+
+ with mock.patch.object(util, 'subp',
+ return_value=('fakekey 1212', '')) as mockobj:
+ cc_apt_configure.add_apt_sources([cfg], params)
+
+ mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212')
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def apt_src_keyid_real(self, cfg, expectedkey):
+ """apt_src_keyid_real
+ Test specification of a keyid without source including
+ up to addition of the key (add_apt_key_raw mocked to keep the
+ environment as is)
+ """
+ params = self._get_default_params()
+
+ with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey:
+ with mock.patch.object(gpg, 'get_key_by_id',
+ return_value=expectedkey) as mockgetkey:
+ cc_apt_configure.add_apt_sources([cfg], params)
+
+ mockgetkey.assert_called_with(cfg['keyid'],
+ cfg.get('keyserver',
+ 'keyserver.ubuntu.com'))
+ mockkey.assert_called_with(expectedkey)
+
+ # filename should be ignored on key only
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_keyid_real(self):
+ """test_apt_src_keyid_real - Test keyid including key add"""
+ keyid = "03683F77"
+ cfg = {'keyid': keyid,
+ 'filename': self.aptlistfile}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_src_longkeyid_real(self):
+ """test_apt_src_longkeyid_real - Test long keyid including key add"""
+ keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
+ cfg = {'keyid': keyid,
+ 'filename': self.aptlistfile}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_src_longkeyid_ks_real(self):
+ """test_apt_src_longkeyid_ks_real - Test long keyid from other ks"""
+ keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
+ cfg = {'keyid': keyid,
+ 'keyserver': 'keys.gnupg.net',
+ 'filename': self.aptlistfile}
+
+ self.apt_src_keyid_real(cfg, EXPECTEDKEY)
+
+ def test_apt_src_ppa(self):
+ """Test adding a ppa"""
+ params = self._get_default_params()
+ cfg = {'source': 'ppa:smoser/cloud-init-test',
+ 'filename': self.aptlistfile}
+
+ # default matcher needed for ppa
+ matcher = re.compile(r'^[\w-]+:\w').search
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_apt_sources([cfg], params,
+ aa_repo_match=matcher)
+ mockobj.assert_called_once_with(['add-apt-repository',
+ 'ppa:smoser/cloud-init-test'])
+
+ # adding ppa should ignore filename (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+
+ def test_apt_src_ppa_tri(self):
+ """Test adding three ppa's"""
+ params = self._get_default_params()
+ cfg1 = {'source': 'ppa:smoser/cloud-init-test',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'ppa:smoser/cloud-init-test2',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'ppa:smoser/cloud-init-test3',
+ 'filename': self.aptlistfile3}
+
+ # default matcher needed for ppa
+ matcher = re.compile(r'^[\w-]+:\w').search
+
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params,
+ aa_repo_match=matcher)
+ calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']),
+ call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])]
+ mockobj.assert_has_calls(calls, any_order=True)
+
+ # adding ppa should ignore all filenames (uses add-apt-repository)
+ self.assertFalse(os.path.isfile(self.aptlistfile))
+ self.assertFalse(os.path.isfile(self.aptlistfile2))
+ self.assertFalse(os.path.isfile(self.aptlistfile3))
+
+ def test_convert_to_new_format(self):
+ """Test the conversion of old to new format"""
+ cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
+ 'filename': self.aptlistfile}
+ cfg2 = {'source': 'deb $MIRROR $RELEASE main',
+ 'filename': self.aptlistfile2}
+ cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
+ 'filename': self.aptlistfile3}
+ checkcfg = {self.aptlistfile: {'filename': self.aptlistfile,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'multiverse'},
+ self.aptlistfile2: {'filename': self.aptlistfile2,
+ 'source': 'deb $MIRROR $RELEASE main'},
+ self.aptlistfile3: {'filename': self.aptlistfile3,
+ 'source': 'deb $MIRROR $RELEASE '
+ 'universe'}}
+
+ newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3])
+ self.assertEqual(newcfg, checkcfg)
+
+ newcfg2 = cc_apt_configure.convert_to_new_format(newcfg)
+ self.assertEqual(newcfg2, checkcfg)
+
+ with self.assertRaises(ValueError):
+ cc_apt_configure.convert_to_new_format(5)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 34875f7b..a9268d30 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1,7 +1,14 @@
from cloudinit import net
+from cloudinit.net import cmdline
+from cloudinit.net import eni
+from cloudinit.net import network_state
+from cloudinit.net import sysconfig
+from cloudinit.sources.helpers import openstack
from cloudinit import util
+from .helpers import mock
from .helpers import TestCase
+from .helpers import dir2dict
import base64
import copy
@@ -9,9 +16,10 @@ import gzip
import io
import json
import os
+import shutil
+import tempfile
import yaml
-
DHCP_CONTENT_1 = """
DEVICE='eth0'
PROTO='dhcp'
@@ -70,6 +78,71 @@ STATIC_EXPECTED_1 = {
'dns_nameservers': ['10.0.1.1']}],
}
+# Examples (and expected outputs for various renderers).
+OS_SAMPLES = [
+ {
+ 'in_data': {
+ "services": [{"type": "dns", "address": "172.19.0.12"}],
+ "networks": [{
+ "network_id": "dacd568d-5be6-4786-91fe-750c374b78b4",
+ "type": "ipv4", "netmask": "255.255.252.0",
+ "link": "tap1a81968a-79",
+ "routes": [{
+ "netmask": "0.0.0.0",
+ "network": "0.0.0.0",
+ "gateway": "172.19.3.254",
+ }],
+ "ip_address": "172.19.1.34", "id": "network0"
+ }],
+ "links": [
+ {
+ "ethernet_mac_address": "fa:16:3e:ed:9a:59",
+ "mtu": None, "type": "bridge", "id":
+ "tap1a81968a-79",
+ "vif_id": "1a81968a-797a-400f-8a80-567f997eb93f"
+ },
+ ],
+ },
+ 'in_macs': {
+ 'fa:16:3e:ed:9a:59': 'eth0',
+ },
+ 'out_sysconfig': [
+ ('etc/sysconfig/network-scripts/ifcfg-eth0',
+ """
+# Created by cloud-init on instance boot automatically, do not edit.
+#
+BOOTPROTO=static
+DEFROUTE=yes
+DEVICE=eth0
+GATEWAY=172.19.3.254
+HWADDR=fa:16:3e:ed:9a:59
+IPADDR=172.19.1.34
+NETMASK=255.255.252.0
+NM_CONTROLLED=no
+ONBOOT=yes
+TYPE=Ethernet
+USERCTL=no
+""".lstrip()),
+ ('etc/sysconfig/network-scripts/route-eth0',
+ """
+# Created by cloud-init on instance boot automatically, do not edit.
+#
+ADDRESS0=0.0.0.0
+GATEWAY0=172.19.3.254
+NETMASK0=0.0.0.0
+""".lstrip()),
+ ('etc/resolv.conf',
+ """
+; Created by cloud-init on instance boot automatically, do not edit.
+;
+nameserver 172.19.0.12
+""".lstrip()),
+ ('etc/udev/rules.d/70-persistent-net.rules',
+ "".join(['SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", ',
+ 'ATTR{address}=="fa:16:3e:ed:9a:59", NAME="eth0"\n']))]
+ }
+]
+
EXAMPLE_ENI = """
auto lo
iface lo inet loopback
@@ -228,21 +301,148 @@ config:
"""
-class TestNetConfigParsing(TestCase):
+def _setup_test(tmp_dir, mock_get_devicelist, mock_sys_netdev_info,
+ mock_sys_dev_path):
+ mock_get_devicelist.return_value = ['eth1000']
+ dev_characteristics = {
+ 'eth1000': {
+ "bridge": False,
+ "carrier": False,
+ "dormant": False,
+ "operstate": "down",
+ "address": "07-1C-C6-75-A4-BE",
+ }
+ }
+
+ def netdev_info(name, field):
+ return dev_characteristics[name][field]
+
+ mock_sys_netdev_info.side_effect = netdev_info
+
+ def sys_dev_path(devname, path=""):
+ return tmp_dir + devname + "/" + path
+
+ for dev in dev_characteristics:
+ os.makedirs(os.path.join(tmp_dir, dev))
+ with open(os.path.join(tmp_dir, dev, 'operstate'), 'w') as fh:
+ fh.write("down")
+
+ mock_sys_dev_path.side_effect = sys_dev_path
+
+
+class TestSysConfigRendering(TestCase):
+
+ @mock.patch("cloudinit.net.sys_dev_path")
+ @mock.patch("cloudinit.net.sys_netdev_info")
+ @mock.patch("cloudinit.net.get_devicelist")
+ def test_default_generation(self, mock_get_devicelist,
+ mock_sys_netdev_info,
+ mock_sys_dev_path):
+ tmp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmp_dir)
+ _setup_test(tmp_dir, mock_get_devicelist,
+ mock_sys_netdev_info, mock_sys_dev_path)
+
+ network_cfg = net.generate_fallback_config()
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+
+ render_dir = os.path.join(tmp_dir, "render")
+ os.makedirs(render_dir)
+
+ renderer = sysconfig.Renderer()
+ renderer.render_network_state(render_dir, ns)
+
+ render_file = 'etc/sysconfig/network-scripts/ifcfg-eth1000'
+ with open(os.path.join(render_dir, render_file)) as fh:
+ content = fh.read()
+ expected_content = """
+# Created by cloud-init on instance boot automatically, do not edit.
+#
+BOOTPROTO=dhcp
+DEVICE=eth1000
+HWADDR=07-1C-C6-75-A4-BE
+NM_CONTROLLED=no
+ONBOOT=yes
+TYPE=Ethernet
+USERCTL=no
+""".lstrip()
+ self.assertEqual(expected_content, content)
+
+ def test_openstack_rendering_samples(self):
+ tmp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmp_dir)
+ render_dir = os.path.join(tmp_dir, "render")
+ for os_sample in OS_SAMPLES:
+ ex_input = os_sample['in_data']
+ ex_mac_addrs = os_sample['in_macs']
+ network_cfg = openstack.convert_net_json(
+ ex_input, known_macs=ex_mac_addrs)
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+ renderer = sysconfig.Renderer()
+ renderer.render_network_state(render_dir, ns)
+ for fn, expected_content in os_sample.get('out_sysconfig', []):
+ with open(os.path.join(render_dir, fn)) as fh:
+ self.assertEqual(expected_content, fh.read())
+
+
+class TestEniNetRendering(TestCase):
+
+ @mock.patch("cloudinit.net.sys_dev_path")
+ @mock.patch("cloudinit.net.sys_netdev_info")
+ @mock.patch("cloudinit.net.get_devicelist")
+ def test_default_generation(self, mock_get_devicelist,
+ mock_sys_netdev_info,
+ mock_sys_dev_path):
+ tmp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmp_dir)
+ _setup_test(tmp_dir, mock_get_devicelist,
+ mock_sys_netdev_info, mock_sys_dev_path)
+
+ network_cfg = net.generate_fallback_config()
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+
+ render_dir = os.path.join(tmp_dir, "render")
+ os.makedirs(render_dir)
+
+ renderer = eni.Renderer(
+ {'links_path_prefix': None,
+ 'eni_path': 'interfaces', 'netrules_path': None,
+ })
+ renderer.render_network_state(render_dir, ns)
+
+ self.assertTrue(os.path.exists(os.path.join(render_dir,
+ 'interfaces')))
+ with open(os.path.join(render_dir, 'interfaces')) as fh:
+ contents = fh.read()
+
+ expected = """
+auto lo
+iface lo inet loopback
+
+auto eth1000
+iface eth1000 inet dhcp
+"""
+ self.assertEqual(expected.lstrip(), contents.lstrip())
+
+
+class TestCmdlineConfigParsing(TestCase):
simple_cfg = {
'config': [{"type": "physical", "name": "eth0",
"mac_address": "c0:d6:9f:2c:e8:80",
"subnets": [{"type": "dhcp"}]}]}
- def test_klibc_convert_dhcp(self):
- found = net._klibc_to_config_entry(DHCP_CONTENT_1)
+ def test_cmdline_convert_dhcp(self):
+ found = cmdline._klibc_to_config_entry(DHCP_CONTENT_1)
self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
- def test_klibc_convert_static(self):
- found = net._klibc_to_config_entry(STATIC_CONTENT_1)
+ def test_cmdline_convert_static(self):
+ found = cmdline._klibc_to_config_entry(STATIC_CONTENT_1)
self.assertEqual(found, ('eth1', STATIC_EXPECTED_1))
- def test_config_from_klibc_net_cfg(self):
+ def test_config_from_cmdline_net_cfg(self):
files = []
pairs = (('net-eth0.cfg', DHCP_CONTENT_1),
('net-eth1.cfg', STATIC_CONTENT_1))
@@ -263,45 +463,72 @@ class TestNetConfigParsing(TestCase):
files.append(fp)
util.write_file(fp, content)
- found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs)
+ found = cmdline.config_from_klibc_net_cfg(files=files,
+ mac_addrs=macs)
self.assertEqual(found, expected)
def test_cmdline_with_b64(self):
data = base64.b64encode(json.dumps(self.simple_cfg).encode())
encoded_text = data.decode()
- cmdline = 'ro network-config=' + encoded_text + ' root=foo'
- found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline)
self.assertEqual(found, self.simple_cfg)
def test_cmdline_with_b64_gz(self):
data = _gzip_data(json.dumps(self.simple_cfg).encode())
encoded_text = base64.b64encode(data).decode()
- cmdline = 'ro network-config=' + encoded_text + ' root=foo'
- found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline)
self.assertEqual(found, self.simple_cfg)
class TestEniRoundTrip(TestCase):
+ def setUp(self):
+ super(TestCase, self).setUp()
+ self.tmp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp_dir)
+
+ def _render_and_read(self, network_config=None, state=None, eni_path=None,
+ links_prefix=None, netrules_path=None):
+ if network_config:
+ ns = network_state.parse_net_config_data(network_config)
+ elif state:
+ ns = state
+ else:
+ raise ValueError("Expected data or state, got neither")
+
+ if eni_path is None:
+ eni_path = 'etc/network/interfaces'
+
+ renderer = eni.Renderer(
+ config={'eni_path': eni_path, 'links_path_prefix': links_prefix,
+ 'netrules_path': netrules_path})
+
+ renderer.render_network_state(self.tmp_dir, ns)
+ return dir2dict(self.tmp_dir)
+
def testsimple_convert_and_render(self):
- network_config = net.convert_eni_data(EXAMPLE_ENI)
- ns = net.parse_net_config_data(network_config)
- eni = net.render_interfaces(ns)
- print("Eni looks like:\n%s" % eni)
- raise Exception("FOO")
+ network_config = eni.convert_eni_data(EXAMPLE_ENI)
+ ns = network_state.parse_net_config_data(network_config)
+ eni_path = 'etc/network/interfaces.d/my.interfaces'
+ eni_full_path = os.path.join(self.tmp_dir, eni_path)
+ renderer = eni.Renderer(config={'eni_path': eni_path})
+ renderer.render_network_state(self.tmp_dir, ns)
+ eni_content = util.load_file(eni_full_path)
+ print("Eni looks like: %s" % eni_content)
+ raise Exception("FOO1")
def testsimple_render_all(self):
- network_config = yaml.load(NETWORK_YAML_ALL)
- ns = net.parse_net_config_data(network_config)
- eni = net.render_interfaces(ns)
- print("Eni looks like:\n%s" % eni)
- raise Exception("FOO")
+ files = self._render_and_read(network_config=yaml.load(NETWORK_YAML_ALL))
+ print("files: %s" % files)
+ raise Exception("FOO2")
- def testsimple_render_small(self):
+ def skiptestsimple_render_small(self):
network_config = yaml.load(NETWORK_YAML_SMALL)
- ns = net.parse_net_config_data(network_config)
+ ns = network_state.parse_net_config_data(network_config)
eni = net.render_interfaces(ns)
print("Eni looks like:\n%s" % eni)
- raise Exception("FOO")
+ raise Exception("FOO3")
def _gzip_data(data):
diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py
index 5cad8406..20ca23df 100644
--- a/tests/unittests/test_reporting.py
+++ b/tests/unittests/test_reporting.py
@@ -7,7 +7,9 @@ from cloudinit import reporting
from cloudinit.reporting import events
from cloudinit.reporting import handlers
-from .helpers import (mock, TestCase)
+import mock
+
+from .helpers import TestCase
def _fake_registry():
diff --git a/tests/unittests/test_rh_subscription.py b/tests/unittests/test_rh_subscription.py
index b84c807b..891dbe77 100644
--- a/tests/unittests/test_rh_subscription.py
+++ b/tests/unittests/test_rh_subscription.py
@@ -1,12 +1,24 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
from cloudinit.config import cc_rh_subscription
from cloudinit import util
-import logging
-import mock
-import unittest
+from .helpers import TestCase, mock
-class GoodTests(unittest.TestCase):
+class GoodTests(TestCase):
def setUp(self):
super(GoodTests, self).setUp()
self.name = "cc_rh_subscription"
@@ -93,7 +105,7 @@ class GoodTests(unittest.TestCase):
self.assertEqual(self.SM._sub_man_cli.call_count, 9)
-class TestBadInput(unittest.TestCase):
+class TestBadInput(TestCase):
name = "cc_rh_subscription"
cloud_init = None
log = logging.getLogger("bad_tests")