summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2016-06-10 14:22:17 -0700
committerJoshua Harlow <harlowja@gmail.com>2016-06-10 14:22:17 -0700
commita8de234ec60c76b7b788513e8111be04ad9205fb (patch)
tree28f16138e108fab6b66a748d9ef697d732c4107a /tests/unittests
parenta3720feb537cb5189bccf4889b601704a4cdf1da (diff)
parent6ada153ebfd9483d76f904dafaa1fc80f61c9205 (diff)
downloadvyos-cloud-init-a8de234ec60c76b7b788513e8111be04ad9205fb.tar.gz
vyos-cloud-init-a8de234ec60c76b7b788513e8111be04ad9205fb.zip
Refactor a large part of the networking code.
Splits off distro specific code into specific files so that other kinds of networking configuration can be written by the various distro(s) that cloud-init supports. It also isolates some of the cloudinit.net code so that it can be more easily used on its own (and incorporated into other projects such as curtin). During this process it adds tests so that the net process can be tested (to some level) so that the format conversion processes can be tested going forward.
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py85
-rw-r--r--tests/unittests/test__init__.py17
-rw-r--r--tests/unittests/test_cli.py7
-rw-r--r--tests/unittests/test_cs_util.py27
-rw-r--r--tests/unittests/test_datasource/test_azure.py12
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py12
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py2
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py11
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py170
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py15
-rw-r--r--tests/unittests/test_datasource/test_smartos.py6
-rw-r--r--tests/unittests/test_net.py95
-rw-r--r--tests/unittests/test_reporting.py4
-rw-r--r--tests/unittests/test_rh_subscription.py22
14 files changed, 277 insertions, 208 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 50b2bd72..8d46a8bf 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -7,13 +7,11 @@ import sys
import tempfile
import unittest
+import mock
import six
+import unittest2
try:
- from unittest import mock
-except ImportError:
- import mock
-try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
@@ -21,6 +19,9 @@ except ImportError:
from cloudinit import helpers as ch
from cloudinit import util
+# Used for skipping tests
+SkipTest = unittest2.SkipTest
+
# Used for detecting different python versions
PY2 = False
PY26 = False
@@ -44,78 +45,6 @@ else:
if _PY_MINOR == 4 and _PY_MICRO < 3:
FIX_HTTPRETTY = True
-if PY26:
- # For now add these on, taken from python 2.7 + slightly adjusted. Drop
- # all this once Python 2.6 is dropped as a minimum requirement.
- class TestCase(unittest.TestCase):
- def setUp(self):
- super(TestCase, self).setUp()
- self.__all_cleanups = ExitStack()
-
- def tearDown(self):
- self.__all_cleanups.close()
- unittest.TestCase.tearDown(self)
-
- def addCleanup(self, function, *args, **kws):
- self.__all_cleanups.callback(function, *args, **kws)
-
- def assertIs(self, expr1, expr2, msg=None):
- if expr1 is not expr2:
- standardMsg = '%r is not %r' % (expr1, expr2)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIn(self, member, container, msg=None):
- if member not in container:
- standardMsg = '%r not found in %r' % (member, container)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertNotIn(self, member, container, msg=None):
- if member in container:
- standardMsg = '%r unexpectedly found in %r'
- standardMsg = standardMsg % (member, container)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIsNone(self, value, msg=None):
- if value is not None:
- standardMsg = '%r is not None'
- standardMsg = standardMsg % (value)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertIsInstance(self, obj, cls, msg=None):
- """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
- default message."""
- if not isinstance(obj, cls):
- standardMsg = '%s is not an instance of %r' % (repr(obj), cls)
- self.fail(self._formatMessage(msg, standardMsg))
-
- def assertDictContainsSubset(self, expected, actual, msg=None):
- missing = []
- mismatched = []
- for k, v in expected.items():
- if k not in actual:
- missing.append(k)
- elif actual[k] != v:
- mismatched.append('%r, expected: %r, actual: %r'
- % (k, v, actual[k]))
-
- if len(missing) == 0 and len(mismatched) == 0:
- return
-
- standardMsg = ''
- if missing:
- standardMsg = 'Missing: %r' % ','.join(m for m in missing)
- if mismatched:
- if standardMsg:
- standardMsg += '; '
- standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
-
- self.fail(self._formatMessage(msg, standardMsg))
-
-
-else:
- class TestCase(unittest.TestCase):
- pass
-
# Makes the old path start
# with new base instead of whatever
@@ -151,6 +80,10 @@ def retarget_many_wrapper(new_base, am, old_func):
return wrapper
+class TestCase(unittest2.TestCase):
+ pass
+
+
class ResourceUsingTestCase(TestCase):
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 153f1658..0154784a 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,16 +1,6 @@
import os
import shutil
import tempfile
-import unittest
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
from cloudinit import handlers
from cloudinit import helpers
@@ -18,7 +8,7 @@ from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
-from .helpers import TestCase
+from .helpers import TestCase, ExitStack, mock
class FakeModule(handlers.Handler):
@@ -99,9 +89,10 @@ class TestWalkerHandleHandler(TestCase):
self.assertEqual(self.data['handlercount'], 0)
-class TestHandlerHandlePart(unittest.TestCase):
+class TestHandlerHandlePart(TestCase):
def setUp(self):
+ super(TestHandlerHandlePart, self).setUp()
self.data = "fake data"
self.ctype = "fake ctype"
self.filename = "fake filename"
@@ -177,7 +168,7 @@ class TestHandlerHandlePart(unittest.TestCase):
self.data, self.ctype, self.filename, self.payload)
-class TestCmdlineUrl(unittest.TestCase):
+class TestCmdlineUrl(TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
index f537bd83..163ce64c 100644
--- a/tests/unittests/test_cli.py
+++ b/tests/unittests/test_cli.py
@@ -4,12 +4,7 @@ import six
import sys
from . import helpers as test_helpers
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
+mock = test_helpers.mock
BIN_CLOUDINIT = "bin/cloud-init"
diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py
index d7273035..56c9ce9e 100644
--- a/tests/unittests/test_cs_util.py
+++ b/tests/unittests/test_cs_util.py
@@ -1,21 +1,9 @@
from __future__ import print_function
-import sys
-import unittest
+from . import helpers as test_helpers
from cloudinit.cs_utils import Cepko
-try:
- skip = unittest.skip
-except AttributeError:
- # Python 2.6. Doesn't have to be high fidelity.
- def skip(reason):
- def decorator(func):
- def wrapper(*args, **kws):
- print(reason, file=sys.stderr)
- return wrapper
- return decorator
-
SERVER_CONTEXT = {
"cpu": 1000,
@@ -43,18 +31,9 @@ class CepkoMock(Cepko):
# 2015-01-22 BAW: This test is completely useless because it only ever tests
# the CepkoMock object. Even in its original form, I don't think it ever
# touched the underlying Cepko class methods.
-@skip('This test is completely useless')
-class CepkoResultTests(unittest.TestCase):
+class CepkoResultTests(test_helpers.TestCase):
def setUp(self):
- pass
- # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
- # spec=CepkoMock,
- # count=False,
- # passthrough=False)
- # self.mocked()
- # self.mocker.result(CepkoMock())
- # self.mocker.replay()
- # self.c = Cepko()
+ raise test_helpers.SkipTest('This test is completely useless')
def test_getitem(self):
result = self.c.all()
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 5f3eb31f..e90e903c 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,16 +1,8 @@
from cloudinit import helpers
from cloudinit.util import b64e, decode_binary, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import TestCase, populate_dir
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
import crypt
import os
@@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase):
def setUp(self):
super(TestAzureDataSource, self).setUp()
+ if PY26:
+ raise SkipTest("Does not work on python 2.6")
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 70e8f7f2..65202ff0 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -2,17 +2,7 @@ import os
from cloudinit.sources.helpers import azure as azure_helper
-from ..helpers import TestCase
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import ExitStack, mock, TestCase
GOAL_STATE_TEMPLATE = """\
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 772d189a..2a42ce0c 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -1,4 +1,5 @@
# coding: utf-8
+
import copy
from cloudinit.cs_utils import Cepko
@@ -6,7 +7,6 @@ from cloudinit.sources import DataSourceCloudSigma
from .. import helpers as test_helpers
-
SERVER_CONTEXT = {
"cpu": 1000,
"cpus_instead_of_cores": False,
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index 974b3704..b1aab17b 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -1,16 +1,7 @@
from cloudinit import helpers
from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
-from ..helpers import TestCase
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, mock, ExitStack
class TestCloudStackPasswordFetching(TestCase):
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 83d75f9c..18551b92 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -5,23 +5,15 @@ import shutil
import six
import tempfile
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
-
from cloudinit import helpers
-from cloudinit import net
+from cloudinit.net import eni
+from cloudinit.net import network_state
from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from ..helpers import TestCase
+from ..helpers import TestCase, ExitStack, mock
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -115,6 +107,7 @@ KNOWN_MACS = {
'fa:16:3e:d4:57:ad': 'enp0s2',
'fa:16:3e:dd:50:9a': 'foo1',
'fa:16:3e:a8:14:69': 'foo2',
+ 'fa:16:3e:ed:9a:59': 'foo3',
}
CFG_DRIVE_FILES_V2 = {
@@ -377,35 +370,150 @@ class TestConfigDriveDataSource(TestCase):
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
- def test_pubkeys_v2(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
- def test_network_data_is_found(self):
+
+class TestNetJson(TestCase):
+ def setUp(self):
+ super(TestNetJson, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.maxDiff = None
+
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- self.assertEqual(myds.network_json, NETWORK_DATA)
+ self.assertIsNotNone(myds.network_json)
- def test_network_config_is_converted(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- network_config = ds.convert_network_data(NETWORK_DATA,
- known_macs=KNOWN_MACS)
+ network_config = openstack.convert_net_json(NETWORK_DATA,
+ known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
+ def test_network_config_conversions(self):
+ """Tests a bunch of input network json and checks the
+ expected conversions."""
+ in_datas = [
+ NETWORK_DATA,
+ {
+ 'services': [{'type': 'dns', 'address': '172.19.0.12'}],
+ 'networks': [{
+ 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4',
+ 'type': 'ipv4',
+ 'netmask': '255.255.252.0',
+ 'link': 'tap1a81968a-79',
+ 'routes': [{
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ 'gateway': '172.19.3.254',
+ }],
+ 'ip_address': '172.19.1.34',
+ 'id': 'network0',
+ }],
+ 'links': [{
+ 'type': 'bridge',
+ 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f',
+ 'ethernet_mac_address': 'fa:16:3e:ed:9a:59',
+ 'id': 'tap1a81968a-79',
+ 'mtu': None,
+ }],
+ },
+ ]
+ out_datas = [
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:69:b0:58',
+ 'name': 'enp0s1',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:d4:57:ad',
+ 'name': 'enp0s2',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:05:30:fe',
+ 'name': 'nic0',
+ 'mtu': None,
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.44.24',
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.47.54',
+ }
+ ],
+
+ },
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'name': 'foo3',
+ 'mac_address': 'fa:16:3e:ed:9a:59',
+ 'mtu': None,
+ 'type': 'physical',
+ 'subnets': [
+ {
+ 'address': '172.19.1.34',
+ 'netmask': '255.255.252.0',
+ 'type': 'static',
+ 'ipv4': True,
+ 'routes': [{
+ 'gateway': '172.19.3.254',
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ }],
+ }
+ ]
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '172.19.0.12',
+ }
+ ],
+ },
+ ]
+ for in_data, out_data in zip(in_datas, out_datas):
+ conv_data = openstack.convert_net_json(in_data,
+ known_macs=KNOWN_MACS)
+ self.assertEqual(out_data, conv_data)
+
class TestConvertNetworkData(TestCase):
+ def setUp(self):
+ super(TestConvertNetworkData, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
def _getnames_in_config(self, ncfg):
return set([n['name'] for n in ncfg['config']
if n['type'] == 'physical'])
def test_conversion_fills_names(self):
- ncfg = ds.convert_network_data(NETWORK_DATA, known_macs=KNOWN_MACS)
+ ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
expected = set(['nic0', 'enp0s1', 'enp0s2'])
found = self._getnames_in_config(ncfg)
self.assertEqual(found, expected)
@@ -417,18 +525,19 @@ class TestConvertNetworkData(TestCase):
'fa:16:3e:69:b0:58': 'ens1'})
get_interfaces_by_mac.return_value = macs
- ncfg = ds.convert_network_data(NETWORK_DATA)
+ ncfg = openstack.convert_net_json(NETWORK_DATA)
expected = set(['nic0', 'ens1', 'enp0s2'])
found = self._getnames_in_config(ncfg)
self.assertEqual(found, expected)
def test_convert_raises_value_error_on_missing_name(self):
macs = {'aa:aa:aa:aa:aa:00': 'ens1'}
- self.assertRaises(ValueError, ds.convert_network_data,
+ self.assertRaises(ValueError, openstack.convert_net_json,
NETWORK_DATA, known_macs=macs)
def test_conversion_with_route(self):
- ncfg = ds.convert_network_data(NETWORK_DATA_2, known_macs=KNOWN_MACS)
+ ncfg = openstack.convert_net_json(NETWORK_DATA_2,
+ known_macs=KNOWN_MACS)
# not the best test, but see that we get a route in the
# network config and that it gets rendered to an ENI file
routes = []
@@ -438,15 +547,23 @@ class TestConvertNetworkData(TestCase):
self.assertIn(
{'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'},
routes)
- eni = net.render_interfaces(net.parse_net_config_data(ncfg))
- self.assertIn("route add default gw 2.2.2.9", eni)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+ self.assertIn("route add default gw 2.2.2.9", eni_rendering)
def cfg_ds_from_dir(seed_d):
- found = ds.read_config_drive(seed_d)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
helpers.Paths({}))
- populate_ds_from_read_config(cfg_ds, seed_d, found)
+ cfg_ds.seed_dir = seed_d
+ cfg_ds.known_macs = KNOWN_MACS.copy()
+ if not cfg_ds.get_data():
+ raise RuntimeError("Data source did not extract itself from"
+ " seed directory %s" % seed_d)
return cfg_ds
@@ -460,7 +577,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.userdata_raw = results.get('userdata')
cfg_ds.version = results.get('version')
cfg_ds.network_json = results.get('networkdata')
- cfg_ds._network_config = ds.convert_network_data(
+ cfg_ds._network_config = openstack.convert_net_json(
cfg_ds.network_json, known_macs=KNOWN_MACS)
@@ -474,7 +591,6 @@ def populate_dir(seed_dir, files):
mode = "w"
else:
mode = "wb"
-
with open(path, mode) as fp:
fp.write(content)
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 3c528c23..b0fa1130 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,22 +1,13 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import TestCase, populate_dir
+from ..helpers import TestCase, populate_dir, mock, ExitStack
import os
import shutil
import tempfile
-import unittest
-import yaml
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+import yaml
class TestNoCloudDataSource(TestCase):
@@ -139,7 +130,7 @@ class TestNoCloudDataSource(TestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(unittest.TestCase):
+class TestParseCommandLineData(TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index ec57cff1..9c6c8768 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -34,11 +34,12 @@ import stat
import tempfile
import uuid
-import serial
+from cloudinit import serial
+from cloudinit.sources import DataSourceSmartOS
+
import six
from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
from ..helpers import mock, FilesystemMockingTestCase, TestCase
@@ -380,6 +381,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
+
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 624a9aa8..7998111a 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1,6 +1,10 @@
from cloudinit import net
+from cloudinit.net import cmdline
+from cloudinit.net import eni
+from cloudinit.net import network_state
from cloudinit import util
+from .helpers import mock
from .helpers import TestCase
import base64
@@ -9,6 +13,8 @@ import gzip
import io
import json
import os
+import shutil
+import tempfile
DHCP_CONTENT_1 = """
DEVICE='eth0'
@@ -69,21 +75,87 @@ STATIC_EXPECTED_1 = {
}
-class TestNetConfigParsing(TestCase):
+class TestEniNetRendering(TestCase):
+
+ @mock.patch("cloudinit.net.sys_dev_path")
+ @mock.patch("cloudinit.net.sys_netdev_info")
+ @mock.patch("cloudinit.net.get_devicelist")
+ def test_default_generation(self, mock_get_devicelist,
+ mock_sys_netdev_info,
+ mock_sys_dev_path):
+ mock_get_devicelist.return_value = ['eth1000', 'lo']
+
+ dev_characteristics = {
+ 'eth1000': {
+ "bridge": False,
+ "carrier": False,
+ "dormant": False,
+ "operstate": "down",
+ "address": "07-1C-C6-75-A4-BE",
+ }
+ }
+
+ def netdev_info(name, field):
+ return dev_characteristics[name][field]
+
+ mock_sys_netdev_info.side_effect = netdev_info
+
+ tmp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmp_dir)
+
+ def sys_dev_path(devname, path=""):
+ return tmp_dir + devname + "/" + path
+
+ for dev in dev_characteristics:
+ os.makedirs(os.path.join(tmp_dir, dev))
+ with open(os.path.join(tmp_dir, dev, 'operstate'), 'w') as fh:
+ fh.write("down")
+
+ mock_sys_dev_path.side_effect = sys_dev_path
+
+ network_cfg = net.generate_fallback_config()
+ ns = network_state.parse_net_config_data(network_cfg,
+ skip_broken=False)
+
+ render_dir = os.path.join(tmp_dir, "render")
+ os.makedirs(render_dir)
+
+ renderer = eni.Renderer()
+ renderer.render_network_state(render_dir, ns,
+ eni="interfaces",
+ links_prefix=None,
+ netrules=None)
+
+ self.assertTrue(os.path.exists(os.path.join(render_dir,
+ 'interfaces')))
+ with open(os.path.join(render_dir, 'interfaces')) as fh:
+ contents = fh.read()
+
+ expected = """
+auto lo
+iface lo inet loopback
+
+auto eth1000
+iface eth1000 inet dhcp
+"""
+ self.assertEqual(expected.lstrip(), contents.lstrip())
+
+
+class TestCmdlineConfigParsing(TestCase):
simple_cfg = {
'config': [{"type": "physical", "name": "eth0",
"mac_address": "c0:d6:9f:2c:e8:80",
"subnets": [{"type": "dhcp"}]}]}
- def test_klibc_convert_dhcp(self):
- found = net._klibc_to_config_entry(DHCP_CONTENT_1)
+ def test_cmdline_convert_dhcp(self):
+ found = cmdline._klibc_to_config_entry(DHCP_CONTENT_1)
self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
- def test_klibc_convert_static(self):
- found = net._klibc_to_config_entry(STATIC_CONTENT_1)
+ def test_cmdline_convert_static(self):
+ found = cmdline._klibc_to_config_entry(STATIC_CONTENT_1)
self.assertEqual(found, ('eth1', STATIC_EXPECTED_1))
- def test_config_from_klibc_net_cfg(self):
+ def test_config_from_cmdline_net_cfg(self):
files = []
pairs = (('net-eth0.cfg', DHCP_CONTENT_1),
('net-eth1.cfg', STATIC_CONTENT_1))
@@ -104,21 +176,22 @@ class TestNetConfigParsing(TestCase):
files.append(fp)
util.write_file(fp, content)
- found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs)
+ found = cmdline.config_from_klibc_net_cfg(files=files,
+ mac_addrs=macs)
self.assertEqual(found, expected)
def test_cmdline_with_b64(self):
data = base64.b64encode(json.dumps(self.simple_cfg).encode())
encoded_text = data.decode()
- cmdline = 'ro network-config=' + encoded_text + ' root=foo'
- found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline)
self.assertEqual(found, self.simple_cfg)
def test_cmdline_with_b64_gz(self):
data = _gzip_data(json.dumps(self.simple_cfg).encode())
encoded_text = base64.b64encode(data).decode()
- cmdline = 'ro network-config=' + encoded_text + ' root=foo'
- found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline)
self.assertEqual(found, self.simple_cfg)
diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py
index 5cad8406..20ca23df 100644
--- a/tests/unittests/test_reporting.py
+++ b/tests/unittests/test_reporting.py
@@ -7,7 +7,9 @@ from cloudinit import reporting
from cloudinit.reporting import events
from cloudinit.reporting import handlers
-from .helpers import (mock, TestCase)
+import mock
+
+from .helpers import TestCase
def _fake_registry():
diff --git a/tests/unittests/test_rh_subscription.py b/tests/unittests/test_rh_subscription.py
index b84c807b..891dbe77 100644
--- a/tests/unittests/test_rh_subscription.py
+++ b/tests/unittests/test_rh_subscription.py
@@ -1,12 +1,24 @@
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
from cloudinit.config import cc_rh_subscription
from cloudinit import util
-import logging
-import mock
-import unittest
+from .helpers import TestCase, mock
-class GoodTests(unittest.TestCase):
+class GoodTests(TestCase):
def setUp(self):
super(GoodTests, self).setUp()
self.name = "cc_rh_subscription"
@@ -93,7 +105,7 @@ class GoodTests(unittest.TestCase):
self.assertEqual(self.SM._sub_man_cli.call_count, 9)
-class TestBadInput(unittest.TestCase):
+class TestBadInput(TestCase):
name = "cc_rh_subscription"
cloud_init = None
log = logging.getLogger("bad_tests")