diff options
author | Christian Ehrhardt <christian.ehrhardt@canonical.com> | 2016-08-10 16:43:14 +0200 |
---|---|---|
committer | Scott Moser <smoser@brickies.net> | 2016-08-18 11:02:44 -0400 |
commit | d861415ff9ab816b1183b8c58ec35348be4fd458 (patch) | |
tree | d3cc08c685f58208979c0fc6e941320cc1347b2b /tests/unittests | |
parent | 648dbbf6b090c81e989f1ab70bf99f4de16a6a70 (diff) | |
download | vyos-cloud-init-d861415ff9ab816b1183b8c58ec35348be4fd458.tar.gz vyos-cloud-init-d861415ff9ab816b1183b8c58ec35348be4fd458.zip |
Apt: add new apt configuration format
This adds an improved apt configuration format that is fully backwards
compatible with previous behavior. This is mostly copied from curtin's
implementation.
It does:
* clean up and centralizes many of the top level 'apt_*' values that
previously existed into a single top level 'apt'key.
* support a 'source' in apt/sources/entry that has only a key
* documents new features and adds tests.
See the added doc/examples/cloud-config-apt.txt for more information.
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/test_distros/test_generic.py | 3 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_conf_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_configure.py) | 24 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_configure_sources_list.py) | 64 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py | 187 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_source_v1.py (renamed from tests/unittests/test_handler/test_handler_apt_source.py) | 135 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_source_v3.py | 1103 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 69 |
7 files changed, 1498 insertions, 87 deletions
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index 96fa0811..24ad115f 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -226,8 +226,5 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): os.symlink('/', '/run/systemd/system') self.assertFalse(d.uses_systemd()) -# def _get_package_mirror_info(mirror_info, availability_zone=None, -# mirror_filter=util.search_for_mirror): - # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_conf_v1.py index d1dca2c4..95fd1da2 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ b/tests/unittests/test_handler/test_handler_apt_conf_v1.py @@ -27,7 +27,7 @@ class TestAptProxyConfig(TestCase): contents, flags=re.IGNORECASE) def test_apt_proxy_written(self): - cfg = {'apt_proxy': 'myproxy'} + cfg = {'proxy': 'myproxy'} cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.pfile)) @@ -37,7 +37,7 @@ class TestAptProxyConfig(TestCase): self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) def test_apt_http_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy'} + cfg = {'http_proxy': 'myproxy'} cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.pfile)) @@ -47,13 +47,13 @@ class TestAptProxyConfig(TestCase): self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) def test_apt_all_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy_http_proxy', - 'apt_https_proxy': 'myproxy_https_proxy', - 'apt_ftp_proxy': 'myproxy_ftp_proxy'} + cfg = {'http_proxy': 'myproxy_http_proxy', + 'https_proxy': 'myproxy_https_proxy', + 'ftp_proxy': 'myproxy_ftp_proxy'} - values = {'http': cfg['apt_http_proxy'], - 'https': cfg['apt_https_proxy'], - 'ftp': cfg['apt_ftp_proxy'], + values = {'http': cfg['http_proxy'], + 'https': cfg['https_proxy'], + 'ftp': cfg['ftp_proxy'], } cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) @@ -74,7 +74,7 @@ class TestAptProxyConfig(TestCase): def test_proxy_replaced(self): util.write_file(self.cfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_proxy': "foo"}, + cc_apt_configure.apply_apt_config({'proxy': "foo"}, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.pfile)) contents = load_tfile_or_url(self.pfile) @@ -82,7 +82,7 @@ class TestAptProxyConfig(TestCase): def test_config_written(self): payload = 'this is my apt config' - cfg = {'apt_config': payload} + cfg = {'conf': payload} cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) @@ -93,13 +93,13 @@ class TestAptProxyConfig(TestCase): def test_config_replaced(self): util.write_file(self.pfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_config': "foo"}, + cc_apt_configure.apply_apt_config({'conf': "foo"}, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.cfile)) self.assertEqual(load_tfile_or_url(self.cfile), "foo") def test_config_deleted(self): - # if no 'apt_config' is provided, delete any previously written file + # if no 'conf' is provided, delete any previously written file util.write_file(self.pfile, "content doesnt matter") cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) self.assertFalse(os.path.isfile(self.pfile)) diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py index acde0863..f4411869 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py @@ -79,6 +79,15 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): self.new_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.new_root) + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': "fakerelease"} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + def _get_cloud(self, distro, metadata=None): self.patchUtils(self.new_root) paths = helpers.Paths({}) @@ -102,25 +111,38 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): cfg = {'apt_mirror': mirror} mycloud = self._get_cloud(distro) - with mock.patch.object(templater, 'render_to_file') as mocktmpl: - with mock.patch.object(os.path, 'isfile', - return_value=True) as mockisfile: - with mock.patch.object(util, 'rename'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) + with mock.patch.object(util, 'write_file') as mockwf: + with mock.patch.object(util, 'load_file', + return_value="faketmpl") as mocklf: + with mock.patch.object(os.path, 'isfile', + return_value=True) as mockisfile: + with mock.patch.object(templater, 'render_string', + return_value="fake") as mockrnd: + with mock.patch.object(util, 'rename'): + cc_apt_configure.handle("test", cfg, mycloud, + LOG, None) mockisfile.assert_any_call( ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) - mocktmpl.assert_called_once_with( - ('/etc/cloud/templates/sources.list.%s.tmpl' % distro), - '/etc/apt/sources.list', - {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck}) - - def test_apt_source_list_debian(self): + mocklf.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mockrnd.assert_called_once_with('faketmpl', + {'RELEASE': 'fakerelease', + 'PRIMARY': mirrorcheck, + 'MIRROR': mirrorcheck, + 'SECURITY': mirrorcheck, + 'codename': 'fakerelease', + 'primary': mirrorcheck, + 'mirror': mirrorcheck, + 'security': mirrorcheck}) + mockwf.assert_called_once_with('/etc/apt/sources.list', 'fake', + mode=0o644) + + def test_apt_v1_source_list_debian(self): """Test rendering of a source.list from template for debian""" self.apt_source_list('debian', 'http://httpredir.debian.org/debian') - def test_apt_source_list_ubuntu(self): + def test_apt_v1_source_list_ubuntu(self): """Test rendering of a source.list from template for ubuntu""" self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/') @@ -134,7 +156,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): print("Faking SUCCESS for '%s'" % name) return True - def test_apt_srcl_debian_mirrorfail(self): + def test_apt_v1_srcl_debian_mirrorfail(self): """Test rendering of a source.list from template for debian""" with mock.patch.object(util, 'is_resolvable', side_effect=self.myresolve) as mockresolve: @@ -145,7 +167,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): mockresolve.assert_any_call("does.not.exist") mockresolve.assert_any_call("httpredir.debian.org") - def test_apt_srcl_ubuntu_mirrorfail(self): + def test_apt_v1_srcl_ubuntu_mirrorfail(self): """Test rendering of a source.list from template for ubuntu""" with mock.patch.object(util, 'is_resolvable', side_effect=self.myresolve) as mockresolve: @@ -156,7 +178,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): mockresolve.assert_any_call("does.not.exist") mockresolve.assert_any_call("archive.ubuntu.com") - def test_apt_srcl_custom(self): + def test_apt_v1_srcl_custom(self): """Test rendering from a custom source.list template""" cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) mycloud = self._get_cloud('ubuntu') @@ -164,12 +186,10 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): # the second mock restores the original subp with mock.patch.object(util, 'write_file') as mockwrite: with mock.patch.object(util, 'subp', self.subp): - with mock.patch.object(cc_apt_configure, 'get_release', - return_value='fakerelease'): - with mock.patch.object(Distro, 'get_primary_arch', - return_value='amd64'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) + with mock.patch.object(Distro, 'get_primary_arch', + return_value='amd64'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) mockwrite.assert_called_once_with( '/etc/apt/sources.list', diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py new file mode 100644 index 00000000..e53b0450 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py @@ -0,0 +1,187 @@ +""" test_apt_custom_sources_list +Test templating of custom sources list +""" +import logging +import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from cloudinit.distros.debian import Distro + +from .. import helpers as t_help + +LOG = logging.getLogger(__name__) + +TARGET = "/" + +# Input and expected output for the custom template +YAML_TEXT_CUSTOM_SL = """ +apt: + primary: + - arches: [default] + uri: http://test.ubuntu.com/ubuntu/ + security: + - arches: [default] + uri: http://testsec.ubuntu.com/ubuntu/ + sources_list: | + + # Note, this file is written by cloud-init at install time. It should not + # end up on the installed system itself. + # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to + # newer versions of the distribution. + deb $MIRROR $RELEASE main restricted + deb-src $MIRROR $RELEASE main restricted + deb $PRIMARY $RELEASE universe restricted + deb $SECURITY $RELEASE-security multiverse + # FIND_SOMETHING_SPECIAL +""" + +EXPECTED_CONVERTED_CONTENT = """ +# Note, this file is written by cloud-init at install time. It should not +# end up on the installed system itself. +# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to +# newer versions of the distribution. +deb http://test.ubuntu.com/ubuntu/ fakerel main restricted +deb-src http://test.ubuntu.com/ubuntu/ fakerel main restricted +deb http://test.ubuntu.com/ubuntu/ fakerel universe restricted +deb http://testsec.ubuntu.com/ubuntu/ fakerel-security multiverse +# FIND_SOMETHING_SPECIAL +""" + +# mocked to be independent to the unittest system +MOCKED_APT_SRC_LIST = """ +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""" + +EXPECTED_BASE_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""") + +EXPECTED_MIRROR_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-security main restricted +""") + +EXPECTED_PRIMSEC_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""") + + +class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): + """TestAptSourceConfigSourceList - Class to test sources list rendering""" + def setUp(self): + super(TestAptSourceConfigSourceList, self).setUp() + self.subp = util.subp + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': "fakerel"} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def _apt_source_list(self, cfg, expected, distro): + "_apt_source_list - Test rendering from template (generic)" + + # entry at top level now, wrap in 'apt' key + cfg = {'apt': cfg} + mycloud = self._get_cloud(distro) + with mock.patch.object(util, 'write_file') as mockwf: + with mock.patch.object(util, 'load_file', + return_value=MOCKED_APT_SRC_LIST) as mocklf: + with mock.patch.object(os.path, 'isfile', + return_value=True) as mockisfile: + with mock.patch.object(util, 'rename'): + cc_apt_configure.handle("test", cfg, mycloud, + LOG, None) + + # check if it would have loaded the distro template + mockisfile.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mocklf.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + # check expected content in result + mockwf.assert_called_once_with('/etc/apt/sources.list', expected, + mode=0o644) + + def test_apt_v3_source_list_debian(self): + """test_apt_v3_source_list_debian - without custom sources or parms""" + cfg = {} + self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'debian') + + def test_apt_v3_source_list_ubuntu(self): + """test_apt_v3_source_list_ubuntu - without custom sources or parms""" + cfg = {} + self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'ubuntu') + + def test_apt_v3_source_list_psm(self): + """test_apt_v3_source_list_psm - Test specifying prim+sec mirrors""" + pm = 'http://test.ubuntu.com/ubuntu/' + sm = 'http://testsec.ubuntu.com/ubuntu/' + cfg = {'preserve_sources_list': False, + 'primary': [{'arches': ["default"], + 'uri': pm}], + 'security': [{'arches': ["default"], + 'uri': sm}]} + + self._apt_source_list(cfg, EXPECTED_PRIMSEC_CONTENT, 'ubuntu') + + def test_apt_v3_srcl_custom(self): + """test_apt_v3_srcl_custom - Test rendering a custom source template""" + cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) + mycloud = self._get_cloud('ubuntu') + + # the second mock restores the original subp + with mock.patch.object(util, 'write_file') as mockwrite: + with mock.patch.object(util, 'subp', self.subp): + with mock.patch.object(Distro, 'get_primary_arch', + return_value='amd64'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) + + calls = [call('/etc/apt/sources.list', + EXPECTED_CONVERTED_CONTENT, + mode=0o644)] + mockwrite.assert_has_calls(calls) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py index 99a4d860..d96779c5 100644 --- a/tests/unittests/test_handler/test_handler_apt_source.py +++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py @@ -1,5 +1,7 @@ -""" test_handler_apt_source +""" test_handler_apt_source_v1 Testing various config variations of the apt_source config +This calls all things with v1 format to stress the conversion code on top of +the actually tested code. """ import os import re @@ -32,6 +34,8 @@ S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== =ACB2 -----END PGP PUBLIC KEY BLOCK-----""" +ADD_APT_REPO_MATCH = r"^[\w-]+:\w" + def load_tfile_or_url(*args, **kwargs): """load_tfile_or_url @@ -40,6 +44,19 @@ def load_tfile_or_url(*args, **kwargs): return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) +class FakeDistro(object): + """Fake Distro helper object""" + def update_package_sources(self): + """Fake update_package_sources helper method""" + return + + +class FakeCloud(object): + """Fake Cloud helper object""" + def __init__(self): + self.distro = FakeDistro() + + class TestAptSourceConfig(TestCase): """TestAptSourceConfig Main Class to test apt_source configs @@ -54,25 +71,39 @@ class TestAptSourceConfig(TestCase): self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") self.join = os.path.join + self.matcher = re.compile(ADD_APT_REPO_MATCH).search # mock fallback filename into writable tmp dir self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/", "cloud_config_sources.list") - patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release") - get_rel = patcher.start() - get_rel.return_value = self.release - self.addCleanup(patcher.stop) + self.fakecloud = FakeCloud() - @staticmethod - def _get_default_params(): + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': self.release} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + + def _get_default_params(self): """get_default_params Get the most basic default mrror and release info to be used in tests """ params = {} - params['RELEASE'] = cc_apt_configure.get_release() + params['RELEASE'] = self.release params['MIRROR'] = "http://archive.ubuntu.com/ubuntu" return params + def wrapv1conf(self, cfg): + params = self._get_default_params() + # old v1 list format under old keys, but callabe to main handler + # disable source.list rendering and set mirror to avoid other code + return {'apt_preserve_sources_list': True, + 'apt_mirror': params['MIRROR'], + 'apt_sources': cfg} + def myjoin(self, *args, **kwargs): """myjoin - redir into writable tmpdir""" if (args[0] == "/etc/apt/sources.list.d/" and @@ -86,9 +117,9 @@ class TestAptSourceConfig(TestCase): """apt_src_basic Test Fix deb source string, has to overwrite mirror conf in params """ - params = self._get_default_params() + cfg = self.wrapv1conf(cfg) - cc_apt_configure.add_apt_sources(cfg, params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) self.assertTrue(os.path.isfile(filename)) @@ -181,8 +212,9 @@ class TestAptSourceConfig(TestCase): """apt_src_replace Test Autoreplacement of MIRROR and RELEASE in source specs """ + cfg = self.wrapv1conf(cfg) params = self._get_default_params() - cc_apt_configure.add_apt_sources(cfg, params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) self.assertTrue(os.path.isfile(filename)) @@ -246,16 +278,18 @@ class TestAptSourceConfig(TestCase): """apt_src_keyid Test specification of a source + keyid """ - params = self._get_default_params() + cfg = self.wrapv1conf(cfg) with mock.patch.object(util, 'subp', return_value=('fakekey 1234', '')) as mockobj: - cc_apt_configure.add_apt_sources(cfg, params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) # check if it added the right ammount of keys calls = [] for _ in range(keynum): - calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234')) + calls.append(call(['apt-key', 'add', '-'], + data=b'fakekey 1234', + target=None)) mockobj.assert_has_calls(calls, any_order=True) self.assertTrue(os.path.isfile(filename)) @@ -329,12 +363,13 @@ class TestAptSourceConfig(TestCase): """apt_src_key Test specification of a source + key """ - params = self._get_default_params() + cfg = self.wrapv1conf([cfg]) with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321') + mockobj.assert_called_with(['apt-key', 'add', '-'], + data=b'fakekey 4321', target=None) self.assertTrue(os.path.isfile(filename)) @@ -368,30 +403,31 @@ class TestAptSourceConfig(TestCase): def test_apt_src_keyonly(self): """Test specifying key without source""" - params = self._get_default_params() cfg = {'key': "fakekey 4242", 'filename': self.aptlistfile} + cfg = self.wrapv1conf([cfg]) with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) - mockobj.assert_called_once_with(('apt-key', 'add', '-'), - 'fakekey 4242') + mockobj.assert_called_once_with(['apt-key', 'add', '-'], + data=b'fakekey 4242', target=None) # filename should be ignored on key only self.assertFalse(os.path.isfile(self.aptlistfile)) def test_apt_src_keyidonly(self): """Test specification of a keyid without source""" - params = self._get_default_params() cfg = {'keyid': "03683F77", 'filename': self.aptlistfile} + cfg = self.wrapv1conf([cfg]) with mock.patch.object(util, 'subp', return_value=('fakekey 1212', '')) as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212') + mockobj.assert_called_with(['apt-key', 'add', '-'], + data=b'fakekey 1212', target=None) # filename should be ignored on key only self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -402,17 +438,18 @@ class TestAptSourceConfig(TestCase): up to addition of the key (add_apt_key_raw mocked to keep the environment as is) """ - params = self._get_default_params() + key = cfg['keyid'] + keyserver = cfg.get('keyserver', 'keyserver.ubuntu.com') + cfg = self.wrapv1conf([cfg]) with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: - with mock.patch.object(gpg, 'get_key_by_id', + with mock.patch.object(gpg, 'getkeybyid', return_value=expectedkey) as mockgetkey: - cc_apt_configure.add_apt_sources([cfg], params) + cc_apt_configure.handle("test", cfg, self.fakecloud, + None, None) - mockgetkey.assert_called_with(cfg['keyid'], - cfg.get('keyserver', - 'keyserver.ubuntu.com')) - mockkey.assert_called_with(expectedkey) + mockgetkey.assert_called_with(key, keyserver) + mockkey.assert_called_with(expectedkey, None) # filename should be ignored on key only self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -444,41 +481,38 @@ class TestAptSourceConfig(TestCase): def test_apt_src_ppa(self): """Test adding a ppa""" - params = self._get_default_params() cfg = {'source': 'ppa:smoser/cloud-init-test', 'filename': self.aptlistfile} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search + cfg = self.wrapv1conf([cfg]) with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params, - aa_repo_match=matcher) + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) mockobj.assert_called_once_with(['add-apt-repository', - 'ppa:smoser/cloud-init-test']) + 'ppa:smoser/cloud-init-test'], + target=None) # adding ppa should ignore filename (uses add-apt-repository) self.assertFalse(os.path.isfile(self.aptlistfile)) def test_apt_src_ppa_tri(self): """Test adding three ppa's""" - params = self._get_default_params() cfg1 = {'source': 'ppa:smoser/cloud-init-test', 'filename': self.aptlistfile} cfg2 = {'source': 'ppa:smoser/cloud-init-test2', 'filename': self.aptlistfile2} cfg3 = {'source': 'ppa:smoser/cloud-init-test3', 'filename': self.aptlistfile3} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search + cfg = self.wrapv1conf([cfg1, cfg2, cfg3]) with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params, - aa_repo_match=matcher) - calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])] + cc_apt_configure.handle("test", cfg, self.fakecloud, + None, None) + calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'], + target=None), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'], + target=None), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'], + target=None)] mockobj.assert_has_calls(calls, any_order=True) # adding ppa should ignore all filenames (uses add-apt-repository) @@ -503,14 +537,15 @@ class TestAptSourceConfig(TestCase): 'source': 'deb $MIRROR $RELEASE ' 'universe'}} - newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3]) + newcfg = cc_apt_configure.convert_v1_to_v2_apt_format([cfg1, cfg2, + cfg3]) self.assertEqual(newcfg, checkcfg) - newcfg2 = cc_apt_configure.convert_to_new_format(newcfg) + newcfg2 = cc_apt_configure.convert_v1_to_v2_apt_format(newcfg) self.assertEqual(newcfg2, checkcfg) with self.assertRaises(ValueError): - cc_apt_configure.convert_to_new_format(5) + cc_apt_configure.convert_v1_to_v2_apt_format(5) # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py new file mode 100644 index 00000000..75556b6d --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py @@ -0,0 +1,1103 @@ +"""test_handler_apt_source_v3 +Testing various config variations of the apt_source custom config +This tries to call all in the new v3 format and cares about new features +""" +import glob +import os +import re +import shutil +import socket +import tempfile + +from unittest import TestCase + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import gpg +from cloudinit import helpers +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from .. import helpers as t_help + +EXPECTEDKEY = u"""-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ +NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy +8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 +HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG +CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 +OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ +FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm +S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== +=ACB2 +-----END PGP PUBLIC KEY BLOCK-----""" + +ADD_APT_REPO_MATCH = r"^[\w-]+:\w" + +TARGET = None + + +def load_tfile(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class TestAptSourceConfig(t_help.FilesystemMockingTestCase): + """TestAptSourceConfig + Main Class to test apt configs + """ + def setUp(self): + super(TestAptSourceConfig, self).setUp() + self.tmp = tempfile.mkdtemp() + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.aptlistfile = os.path.join(self.tmp, "single-deb.list") + self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") + self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") + self.join = os.path.join + self.matcher = re.compile(ADD_APT_REPO_MATCH).search + + @staticmethod + def _add_apt_sources(*args, **kwargs): + with mock.patch.object(cc_apt_configure, 'update_packages'): + cc_apt_configure.add_apt_sources(*args, **kwargs) + + @staticmethod + def _get_default_params(): + """get_default_params + Get the most basic default mrror and release info to be used in tests + """ + params = {} + params['RELEASE'] = util.lsb_release()['codename'] + arch = 'amd64' + params['MIRROR'] = cc_apt_configure.\ + get_default_mirrors(arch)["PRIMARY"] + return params + + def _myjoin(self, *args, **kwargs): + """_myjoin - redir into writable tmpdir""" + if (args[0] == "/etc/apt/sources.list.d/" and + args[1] == "cloud_config_sources.list" and + len(args) == 2): + return self.join(self.tmp, args[0].lstrip("/"), args[1]) + else: + return self.join(*args, **kwargs) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def _apt_src_basic(self, filename, cfg): + """_apt_src_basic + Test Fix deb source string, has to overwrite mirror conf in params + """ + params = self._get_default_params() + + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "karmic-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_basic(self): + """test_apt_v3_src_basic - Test fix deb source string""" + cfg = {self.aptlistfile: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}} + self._apt_src_basic(self.aptlistfile, cfg) + + def test_apt_v3_src_basic_tri(self): + """test_apt_v3_src_basic_tri - Test multiple fix deb source strings""" + cfg = {self.aptlistfile: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}, + self.aptlistfile2: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted')}, + self.aptlistfile3: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted')}} + self._apt_src_basic(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "precise-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "lucid-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def _apt_src_replacement(self, filename, cfg): + """apt_src_replace + Test Autoreplacement of MIRROR and RELEASE in source specs + """ + params = self._get_default_params() + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_replace(self): + """test_apt_v3_src_replace - Test replacement of MIRROR & RELEASE""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}} + self._apt_src_replacement(self.aptlistfile, cfg) + + def test_apt_v3_src_replace_fn(self): + """test_apt_v3_src_replace_fn - Test filename overwritten in dict""" + cfg = {'ignored': {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile}} + # second file should overwrite the dict key + self._apt_src_replacement(self.aptlistfile, cfg) + + def _apt_src_replace_tri(self, cfg): + """_apt_src_replace_tri + Test three autoreplacements of MIRROR and RELEASE in source specs with + generic part + """ + self._apt_src_replacement(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + params = self._get_default_params() + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "main"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "universe"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_replace_tri(self): + """test_apt_v3_src_replace_tri - Test multiple replace/overwrites""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, + 'notused': {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} + self._apt_src_replace_tri(cfg) + + def _apt_src_keyid(self, filename, cfg, keynum): + """_apt_src_keyid + Test specification of a source + keyid + """ + params = self._get_default_params() + + with mock.patch("cloudinit.util.subp", + return_value=('fakekey 1234', '')) as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + # check if it added the right ammount of keys + calls = [] + for _ in range(keynum): + calls.append(call(['apt-key', 'add', '-'], data=b'fakekey 1234', + target=TARGET)) + mockobj.assert_has_calls(calls, any_order=True) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_keyid(self): + """test_apt_v3_src_keyid - Test source + keyid with filename""" + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"}} + self._apt_src_keyid(self.aptlistfile, cfg, 1) + + def test_apt_v3_src_keyid_tri(self): + """test_apt_v3_src_keyid_tri - Test multiple src+key+filen writes""" + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"}, + 'ignored': {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial universe'), + 'keyid': "03683F77", + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial multiverse'), + 'keyid': "03683F77"}} + + self._apt_src_keyid(self.aptlistfile, cfg, 3) + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "universe"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_key(self): + """test_apt_v3_src_key - Test source + key""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321"}} + + with mock.patch.object(util, 'subp') as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4321', + target=TARGET) + + self.assertTrue(os.path.isfile(self.aptlistfile)) + + contents = load_tfile(self.aptlistfile) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_keyonly(self): + """test_apt_v3_src_keyonly - Test key without source""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'key': "fakekey 4242"}} + + with mock.patch.object(util, 'subp') as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4242', + target=TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_keyidonly(self): + """test_apt_v3_src_keyidonly - Test keyid without source""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'keyid': "03683F77"}} + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1212', '')) as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 1212', + target=TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def apt_src_keyid_real(self, cfg, expectedkey): + """apt_src_keyid_real + Test specification of a keyid without source including + up to addition of the key (add_apt_key_raw mocked to keep the + environment as is) + """ + params = self._get_default_params() + + with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: + with mock.patch.object(gpg, 'getkeybyid', + return_value=expectedkey) as mockgetkey: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + keycfg = cfg[self.aptlistfile] + mockgetkey.assert_called_with(keycfg['keyid'], + keycfg.get('keyserver', + 'keyserver.ubuntu.com')) + mockkey.assert_called_with(expectedkey, TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_keyid_real(self): + """test_apt_v3_src_keyid_real - Test keyid including key add""" + keyid = "03683F77" + cfg = {self.aptlistfile: {'keyid': keyid}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_longkeyid_real(self): + """test_apt_v3_src_longkeyid_real Test long keyid including key add""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {self.aptlistfile: {'keyid': keyid}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_longkeyid_ks_real(self): + """test_apt_v3_src_longkeyid_ks_real Test long keyid from other ks""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {self.aptlistfile: {'keyid': keyid, + 'keyserver': 'keys.gnupg.net'}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_keyid_keyserver(self): + """test_apt_v3_src_keyid_keyserver - Test custom keyserver""" + keyid = "03683F77" + params = self._get_default_params() + cfg = {self.aptlistfile: {'keyid': keyid, + 'keyserver': 'test.random.com'}} + + # in some test environments only *.ubuntu.com is reachable + # so mock the call and check if the config got there + with mock.patch.object(gpg, 'getkeybyid', + return_value="fakekey") as mockgetkey: + with mock.patch.object(cc_apt_configure, + 'add_apt_key_raw') as mockadd: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockgetkey.assert_called_with('03683F77', 'test.random.com') + mockadd.assert_called_with('fakekey', TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_ppa(self): + """test_apt_v3_src_ppa - Test specification of a ppa""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'}} + + with mock.patch("cloudinit.util.subp") as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + mockobj.assert_any_call(['add-apt-repository', + 'ppa:smoser/cloud-init-test'], target=TARGET) + + # adding ppa should ignore filename (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_ppa_tri(self): + """test_apt_v3_src_ppa_tri - Test specification of multiple ppa's""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'}, + self.aptlistfile2: {'source': 'ppa:smoser/cloud-init-test2'}, + self.aptlistfile3: {'source': 'ppa:smoser/cloud-init-test3'}} + + with mock.patch("cloudinit.util.subp") as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'], + target=TARGET), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'], + target=TARGET), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'], + target=TARGET)] + mockobj.assert_has_calls(calls, any_order=True) + + # adding ppa should ignore all filenames (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + self.assertFalse(os.path.isfile(self.aptlistfile2)) + self.assertFalse(os.path.isfile(self.aptlistfile3)) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_list_rename(self, m_get_architecture): + """test_apt_v3_list_rename - Test find mirror and apt list renaming""" + pre = "/var/lib/apt/lists" + # filenames are archive dependent + + arch = 's390x' + m_get_architecture.return_value = arch + component = "ubuntu-ports" + archive = "ports.ubuntu.com" + + cfg = {'primary': [{'arches': ["default"], + 'uri': + 'http://test.ubuntu.com/%s/' % component}], + 'security': [{'arches': ["default"], + 'uri': + 'http://testsec.ubuntu.com/%s/' % component}]} + post = ("%s_dists_%s-updates_InRelease" % + (component, util.lsb_release()['codename'])) + fromfn = ("%s/%s_%s" % (pre, archive, post)) + tofn = ("%s/test.ubuntu.com_%s" % (pre, post)) + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + + self.assertEqual(mirrors['MIRROR'], + "http://test.ubuntu.com/%s/" % component) + self.assertEqual(mirrors['PRIMARY'], + "http://test.ubuntu.com/%s/" % component) + self.assertEqual(mirrors['SECURITY'], + "http://testsec.ubuntu.com/%s/" % component) + + with mock.patch.object(os, 'rename') as mockren: + with mock.patch.object(glob, 'glob', + return_value=[fromfn]): + cc_apt_configure.rename_apt_lists(mirrors, TARGET) + + mockren.assert_any_call(fromfn, tofn) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_list_rename_non_slash(self, m_get_architecture): + target = os.path.join(self.tmp, "rename_non_slash") + apt_lists_d = os.path.join(target, "./" + cc_apt_configure.APT_LISTS) + + m_get_architecture.return_value = 'amd64' + + mirror_path = "some/random/path/" + primary = "http://test.ubuntu.com/" + mirror_path + security = "http://test-security.ubuntu.com/" + mirror_path + mirrors = {'PRIMARY': primary, 'SECURITY': security} + + # these match default archive prefixes + opri_pre = "archive.ubuntu.com_ubuntu_dists_xenial" + osec_pre = "security.ubuntu.com_ubuntu_dists_xenial" + # this one won't match and should not be renamed defaults. + other_pre = "dl.google.com_linux_chrome_deb_dists_stable" + # these are our new expected prefixes + npri_pre = "test.ubuntu.com_some_random_path_dists_xenial" + nsec_pre = "test-security.ubuntu.com_some_random_path_dists_xenial" + + files = [ + # orig prefix, new prefix, suffix + (opri_pre, npri_pre, "_main_binary-amd64_Packages"), + (opri_pre, npri_pre, "_main_binary-amd64_InRelease"), + (opri_pre, npri_pre, "-updates_main_binary-amd64_Packages"), + (opri_pre, npri_pre, "-updates_main_binary-amd64_InRelease"), + (other_pre, other_pre, "_main_binary-amd64_Packages"), + (other_pre, other_pre, "_Release"), + (other_pre, other_pre, "_Release.gpg"), + (osec_pre, nsec_pre, "_InRelease"), + (osec_pre, nsec_pre, "_main_binary-amd64_Packages"), + (osec_pre, nsec_pre, "_universe_binary-amd64_Packages"), + ] + + expected = sorted([npre + suff for opre, npre, suff in files]) + # create files + for (opre, npre, suff) in files: + fpath = os.path.join(apt_lists_d, opre + suff) + util.write_file(fpath, content=fpath) + + cc_apt_configure.rename_apt_lists(mirrors, target) + found = sorted(os.listdir(apt_lists_d)) + self.assertEqual(expected, found) + + @staticmethod + def test_apt_v3_proxy(): + """test_apt_v3_proxy - Test apt_*proxy configuration""" + cfg = {"proxy": "foobar1", + "http_proxy": "foobar2", + "ftp_proxy": "foobar3", + "https_proxy": "foobar4"} + + with mock.patch.object(util, 'write_file') as mockobj: + cc_apt_configure.apply_apt_config(cfg, "proxyfn", "notused") + + mockobj.assert_called_with('proxyfn', + ('Acquire::http::Proxy "foobar1";\n' + 'Acquire::http::Proxy "foobar2";\n' + 'Acquire::ftp::Proxy "foobar3";\n' + 'Acquire::https::Proxy "foobar4";\n')) + + def test_apt_v3_mirror(self): + """test_apt_v3_mirror - Test defining a mirror""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir}], + "security": [{'arches': ["default"], + "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_default(self): + """test_apt_v3_mirror_default - Test without defining a mirror""" + arch = 'amd64' + default_mirrors = cc_apt_configure.get_default_mirrors(arch) + pmir = default_mirrors["PRIMARY"] + smir = default_mirrors["SECURITY"] + mycloud = self._get_cloud('ubuntu') + mirrors = cc_apt_configure.find_apt_mirror_info({}, mycloud, arch) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_arches(self): + """test_apt_v3_mirror_arches - Test arches selection of mirror""" + pmir = "http://my-primary.ubuntu.com/ubuntu/" + smir = "http://my-security.ubuntu.com/ubuntu/" + arch = 'ppc64el' + cfg = {"primary": [{'arches': ["default"], "uri": "notthis-primary"}, + {'arches': [arch], "uri": pmir}], + "security": [{'arches': ["default"], "uri": "nothis-security"}, + {'arches': [arch], "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + + self.assertEqual(mirrors['PRIMARY'], pmir) + self.assertEqual(mirrors['MIRROR'], pmir) + self.assertEqual(mirrors['SECURITY'], smir) + + def test_apt_v3_mirror_arches_default(self): + """test_apt_v3_mirror_arches - Test falling back to default arch""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir}, + {'arches': ["thisarchdoesntexist"], + "uri": "notthis"}], + "security": [{'arches': ["thisarchdoesntexist"], + "uri": "nothat"}, + {'arches': ["default"], + "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_get_def_mir_non_intel_no_arch(self, m_get_architecture): + arch = 'ppc64el' + m_get_architecture.return_value = arch + expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports', + 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'} + self.assertEqual(expected, cc_apt_configure.get_default_mirrors()) + + def test_apt_v3_get_default_mirrors_non_intel_with_arch(self): + found = cc_apt_configure.get_default_mirrors('ppc64el') + + expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports', + 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'} + self.assertEqual(expected, found) + + def test_apt_v3_mirror_arches_sysdefault(self): + """test_apt_v3_mirror_arches - Test arches fallback to sys default""" + arch = 'amd64' + default_mirrors = cc_apt_configure.get_default_mirrors(arch) + pmir = default_mirrors["PRIMARY"] + smir = default_mirrors["SECURITY"] + mycloud = self._get_cloud('ubuntu') + cfg = {"primary": [{'arches': ["thisarchdoesntexist_64"], + "uri": "notthis"}, + {'arches': ["thisarchdoesntexist"], + "uri": "notthiseither"}], + "security": [{'arches': ["thisarchdoesntexist"], + "uri": "nothat"}, + {'arches': ["thisarchdoesntexist_64"], + "uri": "nothateither"}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + + self.assertEqual(mirrors['MIRROR'], pmir) + self.assertEqual(mirrors['PRIMARY'], pmir) + self.assertEqual(mirrors['SECURITY'], smir) + + def test_apt_v3_mirror_search(self): + """test_apt_v3_mirror_search - Test searching mirrors in a list + mock checks to avoid relying on network connectivity""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "search": ["pfailme", pmir]}], + "security": [{'arches': ["default"], + "search": ["sfailme", smir]}]} + + with mock.patch.object(cc_apt_configure, 'search_for_mirror', + side_effect=[pmir, smir]) as mocksearch: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, + 'amd64') + + calls = [call(["pfailme", pmir]), + call(["sfailme", smir])] + mocksearch.assert_has_calls(calls) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_search_many2(self): + """test_apt_v3_mirror_search_many3 - Test both mirrors specs at once""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir, + "search": ["pfailme", "foo"]}], + "security": [{'arches': ["default"], + "uri": smir, + "search": ["sfailme", "bar"]}]} + + arch = 'amd64' + + # should be called only once per type, despite two mirror configs + mycloud = None + with mock.patch.object(cc_apt_configure, 'get_mirror', + return_value="http://mocked/foo") as mockgm: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(cfg, 'primary', arch, mycloud), + call(cfg, 'security', arch, mycloud)] + mockgm.assert_has_calls(calls) + + # should not be called, since primary is specified + with mock.patch.object(cc_apt_configure, + 'search_for_mirror') as mockse: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + mockse.assert_not_called() + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_url_resolvable(self): + """test_apt_v3_url_resolvable - Test resolving urls""" + + with mock.patch.object(util, 'is_resolvable') as mockresolve: + util.is_resolvable_url("http://1.2.3.4/ubuntu") + mockresolve.assert_called_with("1.2.3.4") + + with mock.patch.object(util, 'is_resolvable') as mockresolve: + util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu") + mockresolve.assert_called_with("us.archive.ubuntu.com") + + # former tests can leave this set (or not if the test is ran directly) + # do a hard reset to ensure a stable result + util._DNS_REDIRECT_IP = None + bad = [(None, None, None, "badname", ["10.3.2.1"])] + good = [(None, None, None, "goodname", ["10.2.3.4"])] + with mock.patch.object(socket, 'getaddrinfo', + side_effect=[bad, bad, bad, good, + good]) as mocksock: + ret = util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu") + ret2 = util.is_resolvable_url("http://1.2.3.4/ubuntu") + mocksock.assert_any_call('does-not-exist.example.com.', None, + 0, 0, 1, 2) + mocksock.assert_any_call('example.invalid.', None, 0, 0, 1, 2) + mocksock.assert_any_call('us.archive.ubuntu.com', None) + mocksock.assert_any_call('1.2.3.4', None) + + self.assertTrue(ret) + self.assertTrue(ret2) + + # side effect need only bad ret after initial call + with mock.patch.object(socket, 'getaddrinfo', + side_effect=[bad]) as mocksock: + ret3 = util.is_resolvable_url("http://failme.com/ubuntu") + calls = [call('failme.com', None)] + mocksock.assert_has_calls(calls) + self.assertFalse(ret3) + + def test_apt_v3_disable_suites(self): + """test_disable_suites - disable_suites with many configurations""" + release = "xenial" + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + + # disable nothing + disabled = [] + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable release suite + disabled = ["$RELEASE"] + expect = """\ +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable other suite + disabled = ["$RELEASE-updates"] + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu""" + """ xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # multi disable + disabled = ["$RELEASE-updates", "$RELEASE-security"] + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # multi line disable (same suite multiple times in input) + disabled = ["$RELEASE-updates", "$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://UBUNTU.com//ubuntu xenial-updates main +deb http://UBUNTU.COM//ubuntu xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +# suite disabled by cloud-init: deb http://UBUNTU.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """ + """xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # comment in input + disabled = ["$RELEASE-updates", "$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +#foo +#deb http://UBUNTU.com//ubuntu xenial-updates main +deb http://UBUNTU.COM//ubuntu xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +#foo +#deb http://UBUNTU.com//ubuntu xenial-updates main +# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """ + """xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable custom suite + disabled = ["foobar"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ foobar main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +# suite disabled by cloud-init: deb http://ubuntu.com/ubuntu/ foobar main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable non existing suite + disabled = ["foobar"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ notfoobar main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ notfoobar main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite with option + disabled = ["$RELEASE-updates"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [a=b] http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb [a=b] http://ubu.com//ubu """ + """xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite with more options and auto $RELEASE expansion + disabled = ["updates"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [a=b c=d] http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb [a=b c=d] \ +http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite while options at others + disabled = ["$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + def test_disable_suites_blank_lines(self): + """test_disable_suites_blank_lines - ensure blank lines allowed""" + lines = ["deb %(repo)s %(rel)s main universe", + "", + "deb %(repo)s %(rel)s-updates main universe", + " # random comment", + "#comment here", + ""] + rel = "trusty" + repo = 'http://example.com/mirrors/ubuntu' + orig = "\n".join(lines) % {'repo': repo, 'rel': rel} + self.assertEqual( + orig, cc_apt_configure.disable_suites(["proposed"], orig, rel)) + + def test_apt_v3_mirror_search_dns(self): + """test_apt_v3_mirror_search_dns - Test searching dns patterns""" + pmir = "phit" + smir = "shit" + arch = 'amd64' + mycloud = self._get_cloud('ubuntu') + cfg = {"primary": [{'arches': ["default"], + "search_dns": True}], + "security": [{'arches': ["default"], + "search_dns": True}]} + + with mock.patch.object(cc_apt_configure, 'get_mirror', + return_value="http://mocked/foo") as mockgm: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(cfg, 'primary', arch, mycloud), + call(cfg, 'security', arch, mycloud)] + mockgm.assert_has_calls(calls) + + with mock.patch.object(cc_apt_configure, 'search_for_mirror_dns', + return_value="http://mocked/foo") as mocksdns: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(True, 'primary', cfg, mycloud), + call(True, 'security', cfg, mycloud)] + mocksdns.assert_has_calls(calls) + + # first return is for the non-dns call before + with mock.patch.object(cc_apt_configure, 'search_for_mirror', + side_effect=[None, pmir, None, smir]) as mockse: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + + calls = [call(None), + call(['http://ubuntu-mirror.localdomain/ubuntu', + 'http://ubuntu-mirror/ubuntu']), + call(None), + call(['http://ubuntu-security-mirror.localdomain/ubuntu', + 'http://ubuntu-security-mirror/ubuntu'])] + mockse.assert_has_calls(calls) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + +class TestDebconfSelections(TestCase): + + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + def test_no_set_sel_if_none_to_set(self, m_set_sel): + cc_apt_configure.apply_debconf_selections({'foo': 'bar'}) + m_set_sel.assert_not_called() + + @mock.patch("cloudinit.config.cc_apt_configure." + "debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_set_sel_call_has_expected_input(self, m_get_inst, m_set_sel): + data = { + 'set1': 'pkga pkga/q1 mybool false', + 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n' + 'pkgc\tpkgc/ip\tstring\t10.0.0.1')} + lines = '\n'.join(data.values()).split('\n') + + m_get_inst.return_value = ["adduser", "apparmor"] + m_set_sel.return_value = None + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + self.assertTrue(m_get_inst.called) + self.assertEqual(m_set_sel.call_count, 1) + + # assumes called with *args value. + selections = m_set_sel.call_args_list[0][0][0].decode() + + missing = [l for l in lines if l not in selections.splitlines()] + self.assertEqual([], missing) + + @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure") + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_reconfigure_if_intersection(self, m_get_inst, m_set_sel, + m_dpkg_r): + data = { + 'set1': 'pkga pkga/q1 mybool false', + 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n' + 'pkgc\tpkgc/ip\tstring\t10.0.0.1'), + 'cloud-init': ('cloud-init cloud-init/datasources' + 'multiselect MAAS')} + + m_set_sel.return_value = None + m_get_inst.return_value = ["adduser", "apparmor", "pkgb", + "cloud-init", 'zdog'] + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + + # reconfigure should be called with the intersection + # of (packages in config, packages installed) + self.assertEqual(m_dpkg_r.call_count, 1) + # assumes called with *args (dpkg_reconfigure([a,b,c], target=)) + packages = m_dpkg_r.call_args_list[0][0][0] + self.assertEqual(set(['cloud-init', 'pkgb']), set(packages)) + + @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure") + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_reconfigure_if_no_intersection(self, m_get_inst, m_set_sel, + m_dpkg_r): + data = {'set1': 'pkga pkga/q1 mybool false'} + + m_get_inst.return_value = ["adduser", "apparmor", "pkgb", + "cloud-init", 'zdog'] + m_set_sel.return_value = None + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + + self.assertTrue(m_get_inst.called) + self.assertEqual(m_dpkg_r.call_count, 0) + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_does_reconfigure(self, m_subp): + target = "/foo-target" + + # due to the way the cleaners are called (via dictionary reference) + # mocking clean_cloud_init directly does not work. So we mock + # the CONFIG_CLEANERS dictionary and assert our cleaner is called. + ci_cleaner = mock.MagicMock() + with mock.patch.dict(("cloudinit.config.cc_apt_configure." + "CONFIG_CLEANERS"), + values={'cloud-init': ci_cleaner}, clear=True): + cc_apt_configure.dpkg_reconfigure(['pkga', 'cloud-init'], + target=target) + # cloud-init is actually the only package we have a cleaner for + # so for now, its the only one that should reconfigured + self.assertTrue(m_subp.called) + ci_cleaner.assert_called_with(target) + self.assertEqual(m_subp.call_count, 1) + found = m_subp.call_args_list[0][0][0] + expected = ['dpkg-reconfigure', '--frontend=noninteractive', + 'cloud-init'] + self.assertEqual(expected, found) + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_not_done_on_no_data(self, m_subp): + cc_apt_configure.dpkg_reconfigure([]) + m_subp.assert_not_called() + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_not_done_if_no_cleaners(self, m_subp): + cc_apt_configure.dpkg_reconfigure(['pkgfoo', 'pkgbar']) + m_subp.assert_not_called() + +# +# vi: ts=4 expandtab diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 73369cd3..d2031f59 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -508,4 +508,73 @@ class TestReadSeeded(helpers.TestCase): self.assertEqual(found_md, {'key1': 'val1'}) self.assertEqual(found_ud, ud) + +class TestSubp(helpers.TestCase): + + stdin2err = ['bash', '-c', 'cat >&2'] + stdin2out = ['cat'] + utf8_invalid = b'ab\xaadef' + utf8_valid = b'start \xc3\xa9 end' + utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' + + def printf_cmd(self, *args): + # bash's printf supports \xaa. So does /usr/bin/printf + # but by using bash, we remove dependency on another program. + return(['bash', '-c', 'printf "$@"', 'printf'] + list(args)) + + def test_subp_handles_utf8(self): + # The given bytes contain utf-8 accented characters as seen in e.g. + # the "deja dup" package in Ubuntu. + cmd = self.printf_cmd(self.utf8_valid_2) + (out, _err) = util.subp(cmd, capture=True) + self.assertEqual(out, self.utf8_valid_2.decode('utf-8')) + + def test_subp_respects_decode_false(self): + (out, err) = util.subp(self.stdin2out, capture=True, decode=False, + data=self.utf8_valid) + self.assertTrue(isinstance(out, bytes)) + self.assertTrue(isinstance(err, bytes)) + self.assertEqual(out, self.utf8_valid) + + def test_subp_decode_ignore(self): + # this executes a string that writes invalid utf-8 to stdout + (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'), + capture=True, decode='ignore') + self.assertEqual(out, 'abcdef') + + def test_subp_decode_strict_valid_utf8(self): + (out, _err) = util.subp(self.stdin2out, capture=True, + decode='strict', data=self.utf8_valid) + self.assertEqual(out, self.utf8_valid.decode('utf-8')) + + def test_subp_decode_invalid_utf8_replaces(self): + (out, _err) = util.subp(self.stdin2out, capture=True, + data=self.utf8_invalid) + expected = self.utf8_invalid.decode('utf-8', errors='replace') + self.assertEqual(out, expected) + + def test_subp_decode_strict_raises(self): + args = [] + kwargs = {'args': self.stdin2out, 'capture': True, + 'decode': 'strict', 'data': self.utf8_invalid} + self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs) + + def test_subp_capture_stderr(self): + data = b'hello world' + (out, err) = util.subp(self.stdin2err, capture=True, + decode=False, data=data) + self.assertEqual(err, data) + self.assertEqual(out, b'') + + def test_returns_none_if_no_capture(self): + (out, err) = util.subp(self.stdin2out, data=b'', capture=False) + self.assertEqual(err, None) + self.assertEqual(out, None) + + def test_bunch_of_slashes_in_path(self): + self.assertEqual("/target/my/path/", + util.target_path("/target/", "//my/path/")) + self.assertEqual("/target/my/path/", + util.target_path("/target/", "///my/path/")) + # vi: ts=4 expandtab |