From d861415ff9ab816b1183b8c58ec35348be4fd458 Mon Sep 17 00:00:00 2001 From: Christian Ehrhardt Date: Wed, 10 Aug 2016 16:43:14 +0200 Subject: Apt: add new apt configuration format This adds an improved apt configuration format that is fully backwards compatible with previous behavior. This is mostly copied from curtin's implementation. It does: * clean up and centralizes many of the top level 'apt_*' values that previously existed into a single top level 'apt'key. * support a 'source' in apt/sources/entry that has only a key * documents new features and adds tests. See the added doc/examples/cloud-config-apt.txt for more information. --- tests/unittests/test_distros/test_generic.py | 3 - .../test_handler/test_handler_apt_conf_v1.py | 109 ++ .../test_handler/test_handler_apt_configure.py | 109 -- .../test_handler_apt_configure_sources_list.py | 180 ---- .../test_handler_apt_configure_sources_list_v1.py | 200 ++++ .../test_handler_apt_configure_sources_list_v3.py | 187 ++++ .../test_handler/test_handler_apt_source.py | 516 --------- .../test_handler/test_handler_apt_source_v1.py | 551 ++++++++++ .../test_handler/test_handler_apt_source_v3.py | 1103 ++++++++++++++++++++ tests/unittests/test_util.py | 69 ++ 10 files changed, 2219 insertions(+), 808 deletions(-) create mode 100644 tests/unittests/test_handler/test_handler_apt_conf_v1.py delete mode 100644 tests/unittests/test_handler/test_handler_apt_configure.py delete mode 100644 tests/unittests/test_handler/test_handler_apt_configure_sources_list.py create mode 100644 tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py create mode 100644 tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py delete mode 100644 tests/unittests/test_handler/test_handler_apt_source.py create mode 100644 tests/unittests/test_handler/test_handler_apt_source_v1.py create mode 100644 tests/unittests/test_handler/test_handler_apt_source_v3.py (limited to 'tests/unittests') diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index 96fa0811..24ad115f 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -226,8 +226,5 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): os.symlink('/', '/run/systemd/system') self.assertFalse(d.uses_systemd()) -# def _get_package_mirror_info(mirror_info, availability_zone=None, -# mirror_filter=util.search_for_mirror): - # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_conf_v1.py b/tests/unittests/test_handler/test_handler_apt_conf_v1.py new file mode 100644 index 00000000..95fd1da2 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_conf_v1.py @@ -0,0 +1,109 @@ +from cloudinit.config import cc_apt_configure +from cloudinit import util + +from ..helpers import TestCase + +import os +import re +import shutil +import tempfile + + +def load_tfile_or_url(*args, **kwargs): + return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)) + + +class TestAptProxyConfig(TestCase): + def setUp(self): + super(TestAptProxyConfig, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.pfile = os.path.join(self.tmp, "proxy.cfg") + self.cfile = os.path.join(self.tmp, "config.cfg") + + def _search_apt_config(self, contents, ptype, value): + return re.search( + r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), + contents, flags=re.IGNORECASE) + + def test_apt_proxy_written(self): + cfg = {'proxy': 'myproxy'} + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = load_tfile_or_url(self.pfile) + self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) + + def test_apt_http_proxy_written(self): + cfg = {'http_proxy': 'myproxy'} + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = load_tfile_or_url(self.pfile) + self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) + + def test_apt_all_proxy_written(self): + cfg = {'http_proxy': 'myproxy_http_proxy', + 'https_proxy': 'myproxy_https_proxy', + 'ftp_proxy': 'myproxy_ftp_proxy'} + + values = {'http': cfg['http_proxy'], + 'https': cfg['https_proxy'], + 'ftp': cfg['ftp_proxy'], + } + + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = load_tfile_or_url(self.pfile) + + for ptype, pval in values.items(): + self.assertTrue(self._search_apt_config(contents, ptype, pval)) + + def test_proxy_deleted(self): + util.write_file(self.cfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) + self.assertFalse(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + def test_proxy_replaced(self): + util.write_file(self.cfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({'proxy': "foo"}, + self.pfile, self.cfile) + self.assertTrue(os.path.isfile(self.pfile)) + contents = load_tfile_or_url(self.pfile) + self.assertTrue(self._search_apt_config(contents, "http", "foo")) + + def test_config_written(self): + payload = 'this is my apt config' + cfg = {'conf': payload} + + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.cfile)) + self.assertFalse(os.path.isfile(self.pfile)) + + self.assertEqual(load_tfile_or_url(self.cfile), payload) + + def test_config_replaced(self): + util.write_file(self.pfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({'conf': "foo"}, + self.pfile, self.cfile) + self.assertTrue(os.path.isfile(self.cfile)) + self.assertEqual(load_tfile_or_url(self.cfile), "foo") + + def test_config_deleted(self): + # if no 'conf' is provided, delete any previously written file + util.write_file(self.pfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) + self.assertFalse(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py deleted file mode 100644 index d1dca2c4..00000000 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ /dev/null @@ -1,109 +0,0 @@ -from cloudinit.config import cc_apt_configure -from cloudinit import util - -from ..helpers import TestCase - -import os -import re -import shutil -import tempfile - - -def load_tfile_or_url(*args, **kwargs): - return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)) - - -class TestAptProxyConfig(TestCase): - def setUp(self): - super(TestAptProxyConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.pfile = os.path.join(self.tmp, "proxy.cfg") - self.cfile = os.path.join(self.tmp, "config.cfg") - - def _search_apt_config(self, contents, ptype, value): - return re.search( - r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), - contents, flags=re.IGNORECASE) - - def test_apt_proxy_written(self): - cfg = {'apt_proxy': 'myproxy'} - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) - - def test_apt_http_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy'} - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) - - def test_apt_all_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy_http_proxy', - 'apt_https_proxy': 'myproxy_https_proxy', - 'apt_ftp_proxy': 'myproxy_ftp_proxy'} - - values = {'http': cfg['apt_http_proxy'], - 'https': cfg['apt_https_proxy'], - 'ftp': cfg['apt_ftp_proxy'], - } - - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - - for ptype, pval in values.items(): - self.assertTrue(self._search_apt_config(contents, ptype, pval)) - - def test_proxy_deleted(self): - util.write_file(self.cfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) - self.assertFalse(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - def test_proxy_replaced(self): - util.write_file(self.cfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_proxy': "foo"}, - self.pfile, self.cfile) - self.assertTrue(os.path.isfile(self.pfile)) - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "foo")) - - def test_config_written(self): - payload = 'this is my apt config' - cfg = {'apt_config': payload} - - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.cfile)) - self.assertFalse(os.path.isfile(self.pfile)) - - self.assertEqual(load_tfile_or_url(self.cfile), payload) - - def test_config_replaced(self): - util.write_file(self.pfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_config': "foo"}, - self.pfile, self.cfile) - self.assertTrue(os.path.isfile(self.cfile)) - self.assertEqual(load_tfile_or_url(self.cfile), "foo") - - def test_config_deleted(self): - # if no 'apt_config' is provided, delete any previously written file - util.write_file(self.pfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) - self.assertFalse(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py deleted file mode 100644 index acde0863..00000000 --- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py +++ /dev/null @@ -1,180 +0,0 @@ -""" test_handler_apt_configure_sources_list -Test templating of sources list -""" -import logging -import os -import shutil -import tempfile - -try: - from unittest import mock -except ImportError: - import mock - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import templater -from cloudinit import util - -from cloudinit.config import cc_apt_configure -from cloudinit.sources import DataSourceNone - -from cloudinit.distros.debian import Distro - -from .. import helpers as t_help - -LOG = logging.getLogger(__name__) - -YAML_TEXT_CUSTOM_SL = """ -apt_mirror: http://archive.ubuntu.com/ubuntu/ -apt_custom_sources_list: | - ## template:jinja - ## Note, this file is written by cloud-init on first boot of an instance - ## modifications made here will not survive a re-bundle. - ## if you wish to make changes you can: - ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg - ## or do the same in user-data - ## b.) add sources in /etc/apt/sources.list.d - ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl - - # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to - # newer versions of the distribution. - deb {{mirror}} {{codename}} main restricted - deb-src {{mirror}} {{codename}} main restricted - # FIND_SOMETHING_SPECIAL -""" - -EXPECTED_CONVERTED_CONTENT = ( - """## Note, this file is written by cloud-init on first boot of an instance -## modifications made here will not survive a re-bundle. -## if you wish to make changes you can: -## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg -## or do the same in user-data -## b.) add sources in /etc/apt/sources.list.d -## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl - -# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to -# newer versions of the distribution. -deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted -deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted -# FIND_SOMETHING_SPECIAL -""") - - -def load_tfile_or_url(*args, **kwargs): - """load_tfile_or_url - load file and return content after decoding - """ - return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) - - -class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): - """TestAptSourceConfigSourceList - Main Class to test sources list rendering - """ - def setUp(self): - super(TestAptSourceConfigSourceList, self).setUp() - self.subp = util.subp - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - - def _get_cloud(self, distro, metadata=None): - self.patchUtils(self.new_root) - paths = helpers.Paths({}) - cls = distros.fetch(distro) - mydist = cls(distro, {}, paths) - myds = DataSourceNone.DataSourceNone({}, mydist, paths) - if metadata: - myds.metadata.update(metadata) - return cloud.Cloud(myds, paths, {}, mydist, None) - - def apt_source_list(self, distro, mirror, mirrorcheck=None): - """apt_source_list - Test rendering of a source.list from template for a given distro - """ - if mirrorcheck is None: - mirrorcheck = mirror - - if isinstance(mirror, list): - cfg = {'apt_mirror_search': mirror} - else: - cfg = {'apt_mirror': mirror} - mycloud = self._get_cloud(distro) - - with mock.patch.object(templater, 'render_to_file') as mocktmpl: - with mock.patch.object(os.path, 'isfile', - return_value=True) as mockisfile: - with mock.patch.object(util, 'rename'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) - - mockisfile.assert_any_call( - ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) - mocktmpl.assert_called_once_with( - ('/etc/cloud/templates/sources.list.%s.tmpl' % distro), - '/etc/apt/sources.list', - {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck}) - - def test_apt_source_list_debian(self): - """Test rendering of a source.list from template for debian""" - self.apt_source_list('debian', 'http://httpredir.debian.org/debian') - - def test_apt_source_list_ubuntu(self): - """Test rendering of a source.list from template for ubuntu""" - self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/') - - @staticmethod - def myresolve(name): - """Fake util.is_resolvable for mirrorfail tests""" - if name == "does.not.exist": - print("Faking FAIL for '%s'" % name) - return False - else: - print("Faking SUCCESS for '%s'" % name) - return True - - def test_apt_srcl_debian_mirrorfail(self): - """Test rendering of a source.list from template for debian""" - with mock.patch.object(util, 'is_resolvable', - side_effect=self.myresolve) as mockresolve: - self.apt_source_list('debian', - ['http://does.not.exist', - 'http://httpredir.debian.org/debian'], - 'http://httpredir.debian.org/debian') - mockresolve.assert_any_call("does.not.exist") - mockresolve.assert_any_call("httpredir.debian.org") - - def test_apt_srcl_ubuntu_mirrorfail(self): - """Test rendering of a source.list from template for ubuntu""" - with mock.patch.object(util, 'is_resolvable', - side_effect=self.myresolve) as mockresolve: - self.apt_source_list('ubuntu', - ['http://does.not.exist', - 'http://archive.ubuntu.com/ubuntu/'], - 'http://archive.ubuntu.com/ubuntu/') - mockresolve.assert_any_call("does.not.exist") - mockresolve.assert_any_call("archive.ubuntu.com") - - def test_apt_srcl_custom(self): - """Test rendering from a custom source.list template""" - cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) - mycloud = self._get_cloud('ubuntu') - - # the second mock restores the original subp - with mock.patch.object(util, 'write_file') as mockwrite: - with mock.patch.object(util, 'subp', self.subp): - with mock.patch.object(cc_apt_configure, 'get_release', - return_value='fakerelease'): - with mock.patch.object(Distro, 'get_primary_arch', - return_value='amd64'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) - - mockwrite.assert_called_once_with( - '/etc/apt/sources.list', - EXPECTED_CONVERTED_CONTENT, - mode=420) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py new file mode 100644 index 00000000..f4411869 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py @@ -0,0 +1,200 @@ +""" test_handler_apt_configure_sources_list +Test templating of sources list +""" +import logging +import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import templater +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from cloudinit.distros.debian import Distro + +from .. import helpers as t_help + +LOG = logging.getLogger(__name__) + +YAML_TEXT_CUSTOM_SL = """ +apt_mirror: http://archive.ubuntu.com/ubuntu/ +apt_custom_sources_list: | + ## template:jinja + ## Note, this file is written by cloud-init on first boot of an instance + ## modifications made here will not survive a re-bundle. + ## if you wish to make changes you can: + ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg + ## or do the same in user-data + ## b.) add sources in /etc/apt/sources.list.d + ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl + + # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to + # newer versions of the distribution. + deb {{mirror}} {{codename}} main restricted + deb-src {{mirror}} {{codename}} main restricted + # FIND_SOMETHING_SPECIAL +""" + +EXPECTED_CONVERTED_CONTENT = ( + """## Note, this file is written by cloud-init on first boot of an instance +## modifications made here will not survive a re-bundle. +## if you wish to make changes you can: +## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg +## or do the same in user-data +## b.) add sources in /etc/apt/sources.list.d +## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl + +# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to +# newer versions of the distribution. +deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted +deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted +# FIND_SOMETHING_SPECIAL +""") + + +def load_tfile_or_url(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): + """TestAptSourceConfigSourceList + Main Class to test sources list rendering + """ + def setUp(self): + super(TestAptSourceConfigSourceList, self).setUp() + self.subp = util.subp + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': "fakerelease"} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def apt_source_list(self, distro, mirror, mirrorcheck=None): + """apt_source_list + Test rendering of a source.list from template for a given distro + """ + if mirrorcheck is None: + mirrorcheck = mirror + + if isinstance(mirror, list): + cfg = {'apt_mirror_search': mirror} + else: + cfg = {'apt_mirror': mirror} + mycloud = self._get_cloud(distro) + + with mock.patch.object(util, 'write_file') as mockwf: + with mock.patch.object(util, 'load_file', + return_value="faketmpl") as mocklf: + with mock.patch.object(os.path, 'isfile', + return_value=True) as mockisfile: + with mock.patch.object(templater, 'render_string', + return_value="fake") as mockrnd: + with mock.patch.object(util, 'rename'): + cc_apt_configure.handle("test", cfg, mycloud, + LOG, None) + + mockisfile.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mocklf.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mockrnd.assert_called_once_with('faketmpl', + {'RELEASE': 'fakerelease', + 'PRIMARY': mirrorcheck, + 'MIRROR': mirrorcheck, + 'SECURITY': mirrorcheck, + 'codename': 'fakerelease', + 'primary': mirrorcheck, + 'mirror': mirrorcheck, + 'security': mirrorcheck}) + mockwf.assert_called_once_with('/etc/apt/sources.list', 'fake', + mode=0o644) + + def test_apt_v1_source_list_debian(self): + """Test rendering of a source.list from template for debian""" + self.apt_source_list('debian', 'http://httpredir.debian.org/debian') + + def test_apt_v1_source_list_ubuntu(self): + """Test rendering of a source.list from template for ubuntu""" + self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/') + + @staticmethod + def myresolve(name): + """Fake util.is_resolvable for mirrorfail tests""" + if name == "does.not.exist": + print("Faking FAIL for '%s'" % name) + return False + else: + print("Faking SUCCESS for '%s'" % name) + return True + + def test_apt_v1_srcl_debian_mirrorfail(self): + """Test rendering of a source.list from template for debian""" + with mock.patch.object(util, 'is_resolvable', + side_effect=self.myresolve) as mockresolve: + self.apt_source_list('debian', + ['http://does.not.exist', + 'http://httpredir.debian.org/debian'], + 'http://httpredir.debian.org/debian') + mockresolve.assert_any_call("does.not.exist") + mockresolve.assert_any_call("httpredir.debian.org") + + def test_apt_v1_srcl_ubuntu_mirrorfail(self): + """Test rendering of a source.list from template for ubuntu""" + with mock.patch.object(util, 'is_resolvable', + side_effect=self.myresolve) as mockresolve: + self.apt_source_list('ubuntu', + ['http://does.not.exist', + 'http://archive.ubuntu.com/ubuntu/'], + 'http://archive.ubuntu.com/ubuntu/') + mockresolve.assert_any_call("does.not.exist") + mockresolve.assert_any_call("archive.ubuntu.com") + + def test_apt_v1_srcl_custom(self): + """Test rendering from a custom source.list template""" + cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) + mycloud = self._get_cloud('ubuntu') + + # the second mock restores the original subp + with mock.patch.object(util, 'write_file') as mockwrite: + with mock.patch.object(util, 'subp', self.subp): + with mock.patch.object(Distro, 'get_primary_arch', + return_value='amd64'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) + + mockwrite.assert_called_once_with( + '/etc/apt/sources.list', + EXPECTED_CONVERTED_CONTENT, + mode=420) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py new file mode 100644 index 00000000..e53b0450 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py @@ -0,0 +1,187 @@ +""" test_apt_custom_sources_list +Test templating of custom sources list +""" +import logging +import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from cloudinit.distros.debian import Distro + +from .. import helpers as t_help + +LOG = logging.getLogger(__name__) + +TARGET = "/" + +# Input and expected output for the custom template +YAML_TEXT_CUSTOM_SL = """ +apt: + primary: + - arches: [default] + uri: http://test.ubuntu.com/ubuntu/ + security: + - arches: [default] + uri: http://testsec.ubuntu.com/ubuntu/ + sources_list: | + + # Note, this file is written by cloud-init at install time. It should not + # end up on the installed system itself. + # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to + # newer versions of the distribution. + deb $MIRROR $RELEASE main restricted + deb-src $MIRROR $RELEASE main restricted + deb $PRIMARY $RELEASE universe restricted + deb $SECURITY $RELEASE-security multiverse + # FIND_SOMETHING_SPECIAL +""" + +EXPECTED_CONVERTED_CONTENT = """ +# Note, this file is written by cloud-init at install time. It should not +# end up on the installed system itself. +# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to +# newer versions of the distribution. +deb http://test.ubuntu.com/ubuntu/ fakerel main restricted +deb-src http://test.ubuntu.com/ubuntu/ fakerel main restricted +deb http://test.ubuntu.com/ubuntu/ fakerel universe restricted +deb http://testsec.ubuntu.com/ubuntu/ fakerel-security multiverse +# FIND_SOMETHING_SPECIAL +""" + +# mocked to be independent to the unittest system +MOCKED_APT_SRC_LIST = """ +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""" + +EXPECTED_BASE_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""") + +EXPECTED_MIRROR_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-security main restricted +""") + +EXPECTED_PRIMSEC_CONTENT = (""" +deb http://test.ubuntu.com/ubuntu/ notouched main restricted +deb-src http://test.ubuntu.com/ubuntu/ notouched main restricted +deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted +deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted +""") + + +class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): + """TestAptSourceConfigSourceList - Class to test sources list rendering""" + def setUp(self): + super(TestAptSourceConfigSourceList, self).setUp() + self.subp = util.subp + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': "fakerel"} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def _apt_source_list(self, cfg, expected, distro): + "_apt_source_list - Test rendering from template (generic)" + + # entry at top level now, wrap in 'apt' key + cfg = {'apt': cfg} + mycloud = self._get_cloud(distro) + with mock.patch.object(util, 'write_file') as mockwf: + with mock.patch.object(util, 'load_file', + return_value=MOCKED_APT_SRC_LIST) as mocklf: + with mock.patch.object(os.path, 'isfile', + return_value=True) as mockisfile: + with mock.patch.object(util, 'rename'): + cc_apt_configure.handle("test", cfg, mycloud, + LOG, None) + + # check if it would have loaded the distro template + mockisfile.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mocklf.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + # check expected content in result + mockwf.assert_called_once_with('/etc/apt/sources.list', expected, + mode=0o644) + + def test_apt_v3_source_list_debian(self): + """test_apt_v3_source_list_debian - without custom sources or parms""" + cfg = {} + self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'debian') + + def test_apt_v3_source_list_ubuntu(self): + """test_apt_v3_source_list_ubuntu - without custom sources or parms""" + cfg = {} + self._apt_source_list(cfg, EXPECTED_BASE_CONTENT, 'ubuntu') + + def test_apt_v3_source_list_psm(self): + """test_apt_v3_source_list_psm - Test specifying prim+sec mirrors""" + pm = 'http://test.ubuntu.com/ubuntu/' + sm = 'http://testsec.ubuntu.com/ubuntu/' + cfg = {'preserve_sources_list': False, + 'primary': [{'arches': ["default"], + 'uri': pm}], + 'security': [{'arches': ["default"], + 'uri': sm}]} + + self._apt_source_list(cfg, EXPECTED_PRIMSEC_CONTENT, 'ubuntu') + + def test_apt_v3_srcl_custom(self): + """test_apt_v3_srcl_custom - Test rendering a custom source template""" + cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) + mycloud = self._get_cloud('ubuntu') + + # the second mock restores the original subp + with mock.patch.object(util, 'write_file') as mockwrite: + with mock.patch.object(util, 'subp', self.subp): + with mock.patch.object(Distro, 'get_primary_arch', + return_value='amd64'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) + + calls = [call('/etc/apt/sources.list', + EXPECTED_CONVERTED_CONTENT, + mode=0o644)] + mockwrite.assert_has_calls(calls) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py deleted file mode 100644 index 99a4d860..00000000 --- a/tests/unittests/test_handler/test_handler_apt_source.py +++ /dev/null @@ -1,516 +0,0 @@ -""" test_handler_apt_source -Testing various config variations of the apt_source config -""" -import os -import re -import shutil -import tempfile - -try: - from unittest import mock -except ImportError: - import mock -from mock import call - -from cloudinit.config import cc_apt_configure -from cloudinit import gpg -from cloudinit import util - -from ..helpers import TestCase - -EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- -Version: GnuPG v1 - -mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ -NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy -8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 -HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG -CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 -OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ -FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm -S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== -=ACB2 ------END PGP PUBLIC KEY BLOCK-----""" - - -def load_tfile_or_url(*args, **kwargs): - """load_tfile_or_url - load file and return content after decoding - """ - return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) - - -class TestAptSourceConfig(TestCase): - """TestAptSourceConfig - Main Class to test apt_source configs - """ - release = "fantastic" - - def setUp(self): - super(TestAptSourceConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.aptlistfile = os.path.join(self.tmp, "single-deb.list") - self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") - self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") - self.join = os.path.join - # mock fallback filename into writable tmp dir - self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/", - "cloud_config_sources.list") - - patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release") - get_rel = patcher.start() - get_rel.return_value = self.release - self.addCleanup(patcher.stop) - - @staticmethod - def _get_default_params(): - """get_default_params - Get the most basic default mrror and release info to be used in tests - """ - params = {} - params['RELEASE'] = cc_apt_configure.get_release() - params['MIRROR'] = "http://archive.ubuntu.com/ubuntu" - return params - - def myjoin(self, *args, **kwargs): - """myjoin - redir into writable tmpdir""" - if (args[0] == "/etc/apt/sources.list.d/" and - args[1] == "cloud_config_sources.list" and - len(args) == 2): - return self.join(self.tmp, args[0].lstrip("/"), args[1]) - else: - return self.join(*args, **kwargs) - - def apt_src_basic(self, filename, cfg): - """apt_src_basic - Test Fix deb source string, has to overwrite mirror conf in params - """ - params = self._get_default_params() - - cc_apt_configure.add_apt_sources(cfg, params) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "karmic-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_basic(self): - """Test deb source string, overwrite mirror and filename""" - cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile} - self.apt_src_basic(self.aptlistfile, [cfg]) - - def test_apt_src_basic_dict(self): - """Test deb source string, overwrite mirror and filename (dict)""" - cfg = {self.aptlistfile: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')}} - self.apt_src_basic(self.aptlistfile, cfg) - - def apt_src_basic_tri(self, cfg): - """apt_src_basic_tri - Test Fix three deb source string, has to overwrite mirror conf in - params. Test with filenames provided in config. - generic part to check three files with different content - """ - self.apt_src_basic(self.aptlistfile, cfg) - - # extra verify on two extra files of this test - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "precise-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "lucid-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_basic_tri(self): - """Test Fix three deb source string with filenames""" - cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile} - cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' precise-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile2} - cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' lucid-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile3} - self.apt_src_basic_tri([cfg1, cfg2, cfg3]) - - def test_apt_src_basic_dict_tri(self): - """Test Fix three deb source string with filenames (dict)""" - cfg = {self.aptlistfile: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')}, - self.aptlistfile2: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' precise-backports' - ' main universe multiverse restricted')}, - self.aptlistfile3: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' lucid-backports' - ' main universe multiverse restricted')}} - self.apt_src_basic_tri(cfg) - - def test_apt_src_basic_nofn(self): - """Test Fix three deb source string without filenames (dict)""" - cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_basic(self.fallbackfn, [cfg]) - - def apt_src_replacement(self, filename, cfg): - """apt_src_replace - Test Autoreplacement of MIRROR and RELEASE in source specs - """ - params = self._get_default_params() - cc_apt_configure.add_apt_sources(cfg, params) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "multiverse"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_replace(self): - """Test Autoreplacement of MIRROR and RELEASE in source specs""" - cfg = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - self.apt_src_replacement(self.aptlistfile, [cfg]) - - def apt_src_replace_tri(self, cfg): - """apt_src_replace_tri - Test three autoreplacements of MIRROR and RELEASE in source specs with - generic part - """ - self.apt_src_replacement(self.aptlistfile, cfg) - - # extra verify on two extra files of this test - params = self._get_default_params() - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "main"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "universe"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_replace_tri(self): - """Test triple Autoreplacement of MIRROR and RELEASE in source specs""" - cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - cfg2 = {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'deb $MIRROR $RELEASE universe', - 'filename': self.aptlistfile3} - self.apt_src_replace_tri([cfg1, cfg2, cfg3]) - - def test_apt_src_replace_dict_tri(self): - """Test triple Autoreplacement in source specs (dict)""" - cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, - 'notused': {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2}, - self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} - self.apt_src_replace_tri(cfg) - - def test_apt_src_replace_nofn(self): - """Test Autoreplacement of MIRROR and RELEASE in source specs nofile""" - cfg = {'source': 'deb $MIRROR $RELEASE multiverse'} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_replacement(self.fallbackfn, [cfg]) - - def apt_src_keyid(self, filename, cfg, keynum): - """apt_src_keyid - Test specification of a source + keyid - """ - params = self._get_default_params() - - with mock.patch.object(util, 'subp', - return_value=('fakekey 1234', '')) as mockobj: - cc_apt_configure.add_apt_sources(cfg, params) - - # check if it added the right ammount of keys - calls = [] - for _ in range(keynum): - calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234')) - mockobj.assert_has_calls(calls, any_order=True) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "main"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_keyid(self): - """Test specification of a source + keyid with filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77", - 'filename': self.aptlistfile} - self.apt_src_keyid(self.aptlistfile, [cfg], 1) - - def test_apt_src_keyid_tri(self): - """Test 3x specification of a source + keyid with filename being set""" - cfg1 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77", - 'filename': self.aptlistfile} - cfg2 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial universe'), - 'keyid': "03683F77", - 'filename': self.aptlistfile2} - cfg3 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial multiverse'), - 'keyid': "03683F77", - 'filename': self.aptlistfile3} - - self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3) - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "universe"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "multiverse"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_keyid_nofn(self): - """Test specification of a source + keyid without filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77"} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_keyid(self.fallbackfn, [cfg], 1) - - def apt_src_key(self, filename, cfg): - """apt_src_key - Test specification of a source + key - """ - params = self._get_default_params() - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321') - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "main"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_key(self): - """Test specification of a source + key with filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'key': "fakekey 4321", - 'filename': self.aptlistfile} - self.apt_src_key(self.aptlistfile, cfg) - - def test_apt_src_key_nofn(self): - """Test specification of a source + key without filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'key': "fakekey 4321"} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_key(self.fallbackfn, cfg) - - def test_apt_src_keyonly(self): - """Test specifying key without source""" - params = self._get_default_params() - cfg = {'key': "fakekey 4242", - 'filename': self.aptlistfile} - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_once_with(('apt-key', 'add', '-'), - 'fakekey 4242') - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_keyidonly(self): - """Test specification of a keyid without source""" - params = self._get_default_params() - cfg = {'keyid': "03683F77", - 'filename': self.aptlistfile} - - with mock.patch.object(util, 'subp', - return_value=('fakekey 1212', '')) as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212') - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def apt_src_keyid_real(self, cfg, expectedkey): - """apt_src_keyid_real - Test specification of a keyid without source including - up to addition of the key (add_apt_key_raw mocked to keep the - environment as is) - """ - params = self._get_default_params() - - with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: - with mock.patch.object(gpg, 'get_key_by_id', - return_value=expectedkey) as mockgetkey: - cc_apt_configure.add_apt_sources([cfg], params) - - mockgetkey.assert_called_with(cfg['keyid'], - cfg.get('keyserver', - 'keyserver.ubuntu.com')) - mockkey.assert_called_with(expectedkey) - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_keyid_real(self): - """test_apt_src_keyid_real - Test keyid including key add""" - keyid = "03683F77" - cfg = {'keyid': keyid, - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_longkeyid_real(self): - """test_apt_src_longkeyid_real - Test long keyid including key add""" - keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" - cfg = {'keyid': keyid, - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_longkeyid_ks_real(self): - """test_apt_src_longkeyid_ks_real - Test long keyid from other ks""" - keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" - cfg = {'keyid': keyid, - 'keyserver': 'keys.gnupg.net', - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_ppa(self): - """Test adding a ppa""" - params = self._get_default_params() - cfg = {'source': 'ppa:smoser/cloud-init-test', - 'filename': self.aptlistfile} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params, - aa_repo_match=matcher) - mockobj.assert_called_once_with(['add-apt-repository', - 'ppa:smoser/cloud-init-test']) - - # adding ppa should ignore filename (uses add-apt-repository) - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_ppa_tri(self): - """Test adding three ppa's""" - params = self._get_default_params() - cfg1 = {'source': 'ppa:smoser/cloud-init-test', - 'filename': self.aptlistfile} - cfg2 = {'source': 'ppa:smoser/cloud-init-test2', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'ppa:smoser/cloud-init-test3', - 'filename': self.aptlistfile3} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params, - aa_repo_match=matcher) - calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])] - mockobj.assert_has_calls(calls, any_order=True) - - # adding ppa should ignore all filenames (uses add-apt-repository) - self.assertFalse(os.path.isfile(self.aptlistfile)) - self.assertFalse(os.path.isfile(self.aptlistfile2)) - self.assertFalse(os.path.isfile(self.aptlistfile3)) - - def test_convert_to_new_format(self): - """Test the conversion of old to new format""" - cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - cfg2 = {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'deb $MIRROR $RELEASE universe', - 'filename': self.aptlistfile3} - checkcfg = {self.aptlistfile: {'filename': self.aptlistfile, - 'source': 'deb $MIRROR $RELEASE ' - 'multiverse'}, - self.aptlistfile2: {'filename': self.aptlistfile2, - 'source': 'deb $MIRROR $RELEASE main'}, - self.aptlistfile3: {'filename': self.aptlistfile3, - 'source': 'deb $MIRROR $RELEASE ' - 'universe'}} - - newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3]) - self.assertEqual(newcfg, checkcfg) - - newcfg2 = cc_apt_configure.convert_to_new_format(newcfg) - self.assertEqual(newcfg2, checkcfg) - - with self.assertRaises(ValueError): - cc_apt_configure.convert_to_new_format(5) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source_v1.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py new file mode 100644 index 00000000..d96779c5 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py @@ -0,0 +1,551 @@ +""" test_handler_apt_source_v1 +Testing various config variations of the apt_source config +This calls all things with v1 format to stress the conversion code on top of +the actually tested code. +""" +import os +import re +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit.config import cc_apt_configure +from cloudinit import gpg +from cloudinit import util + +from ..helpers import TestCase + +EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ +NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy +8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 +HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG +CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 +OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ +FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm +S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== +=ACB2 +-----END PGP PUBLIC KEY BLOCK-----""" + +ADD_APT_REPO_MATCH = r"^[\w-]+:\w" + + +def load_tfile_or_url(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class FakeDistro(object): + """Fake Distro helper object""" + def update_package_sources(self): + """Fake update_package_sources helper method""" + return + + +class FakeCloud(object): + """Fake Cloud helper object""" + def __init__(self): + self.distro = FakeDistro() + + +class TestAptSourceConfig(TestCase): + """TestAptSourceConfig + Main Class to test apt_source configs + """ + release = "fantastic" + + def setUp(self): + super(TestAptSourceConfig, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.aptlistfile = os.path.join(self.tmp, "single-deb.list") + self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") + self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") + self.join = os.path.join + self.matcher = re.compile(ADD_APT_REPO_MATCH).search + # mock fallback filename into writable tmp dir + self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/", + "cloud_config_sources.list") + + self.fakecloud = FakeCloud() + + rpatcher = mock.patch("cloudinit.util.lsb_release") + get_rel = rpatcher.start() + get_rel.return_value = {'codename': self.release} + self.addCleanup(rpatcher.stop) + apatcher = mock.patch("cloudinit.util.get_architecture") + get_arch = apatcher.start() + get_arch.return_value = 'amd64' + self.addCleanup(apatcher.stop) + + def _get_default_params(self): + """get_default_params + Get the most basic default mrror and release info to be used in tests + """ + params = {} + params['RELEASE'] = self.release + params['MIRROR'] = "http://archive.ubuntu.com/ubuntu" + return params + + def wrapv1conf(self, cfg): + params = self._get_default_params() + # old v1 list format under old keys, but callabe to main handler + # disable source.list rendering and set mirror to avoid other code + return {'apt_preserve_sources_list': True, + 'apt_mirror': params['MIRROR'], + 'apt_sources': cfg} + + def myjoin(self, *args, **kwargs): + """myjoin - redir into writable tmpdir""" + if (args[0] == "/etc/apt/sources.list.d/" and + args[1] == "cloud_config_sources.list" and + len(args) == 2): + return self.join(self.tmp, args[0].lstrip("/"), args[1]) + else: + return self.join(*args, **kwargs) + + def apt_src_basic(self, filename, cfg): + """apt_src_basic + Test Fix deb source string, has to overwrite mirror conf in params + """ + cfg = self.wrapv1conf(cfg) + + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "karmic-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_basic(self): + """Test deb source string, overwrite mirror and filename""" + cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile} + self.apt_src_basic(self.aptlistfile, [cfg]) + + def test_apt_src_basic_dict(self): + """Test deb source string, overwrite mirror and filename (dict)""" + cfg = {self.aptlistfile: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}} + self.apt_src_basic(self.aptlistfile, cfg) + + def apt_src_basic_tri(self, cfg): + """apt_src_basic_tri + Test Fix three deb source string, has to overwrite mirror conf in + params. Test with filenames provided in config. + generic part to check three files with different content + """ + self.apt_src_basic(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "precise-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "lucid-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_basic_tri(self): + """Test Fix three deb source string with filenames""" + cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile} + cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile2} + cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile3} + self.apt_src_basic_tri([cfg1, cfg2, cfg3]) + + def test_apt_src_basic_dict_tri(self): + """Test Fix three deb source string with filenames (dict)""" + cfg = {self.aptlistfile: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}, + self.aptlistfile2: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted')}, + self.aptlistfile3: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted')}} + self.apt_src_basic_tri(cfg) + + def test_apt_src_basic_nofn(self): + """Test Fix three deb source string without filenames (dict)""" + cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_basic(self.fallbackfn, [cfg]) + + def apt_src_replacement(self, filename, cfg): + """apt_src_replace + Test Autoreplacement of MIRROR and RELEASE in source specs + """ + cfg = self.wrapv1conf(cfg) + params = self._get_default_params() + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_replace(self): + """Test Autoreplacement of MIRROR and RELEASE in source specs""" + cfg = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + self.apt_src_replacement(self.aptlistfile, [cfg]) + + def apt_src_replace_tri(self, cfg): + """apt_src_replace_tri + Test three autoreplacements of MIRROR and RELEASE in source specs with + generic part + """ + self.apt_src_replacement(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + params = self._get_default_params() + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "main"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "universe"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_replace_tri(self): + """Test triple Autoreplacement of MIRROR and RELEASE in source specs""" + cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + cfg2 = {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'deb $MIRROR $RELEASE universe', + 'filename': self.aptlistfile3} + self.apt_src_replace_tri([cfg1, cfg2, cfg3]) + + def test_apt_src_replace_dict_tri(self): + """Test triple Autoreplacement in source specs (dict)""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, + 'notused': {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} + self.apt_src_replace_tri(cfg) + + def test_apt_src_replace_nofn(self): + """Test Autoreplacement of MIRROR and RELEASE in source specs nofile""" + cfg = {'source': 'deb $MIRROR $RELEASE multiverse'} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_replacement(self.fallbackfn, [cfg]) + + def apt_src_keyid(self, filename, cfg, keynum): + """apt_src_keyid + Test specification of a source + keyid + """ + cfg = self.wrapv1conf(cfg) + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1234', '')) as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + # check if it added the right ammount of keys + calls = [] + for _ in range(keynum): + calls.append(call(['apt-key', 'add', '-'], + data=b'fakekey 1234', + target=None)) + mockobj.assert_has_calls(calls, any_order=True) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_keyid(self): + """Test specification of a source + keyid with filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77", + 'filename': self.aptlistfile} + self.apt_src_keyid(self.aptlistfile, [cfg], 1) + + def test_apt_src_keyid_tri(self): + """Test 3x specification of a source + keyid with filename being set""" + cfg1 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77", + 'filename': self.aptlistfile} + cfg2 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial universe'), + 'keyid': "03683F77", + 'filename': self.aptlistfile2} + cfg3 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial multiverse'), + 'keyid': "03683F77", + 'filename': self.aptlistfile3} + + self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3) + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "universe"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_keyid_nofn(self): + """Test specification of a source + keyid without filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_keyid(self.fallbackfn, [cfg], 1) + + def apt_src_key(self, filename, cfg): + """apt_src_key + Test specification of a source + key + """ + cfg = self.wrapv1conf([cfg]) + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + mockobj.assert_called_with(['apt-key', 'add', '-'], + data=b'fakekey 4321', target=None) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_key(self): + """Test specification of a source + key with filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321", + 'filename': self.aptlistfile} + self.apt_src_key(self.aptlistfile, cfg) + + def test_apt_src_key_nofn(self): + """Test specification of a source + key without filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321"} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_key(self.fallbackfn, cfg) + + def test_apt_src_keyonly(self): + """Test specifying key without source""" + cfg = {'key': "fakekey 4242", + 'filename': self.aptlistfile} + cfg = self.wrapv1conf([cfg]) + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + mockobj.assert_called_once_with(['apt-key', 'add', '-'], + data=b'fakekey 4242', target=None) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_keyidonly(self): + """Test specification of a keyid without source""" + cfg = {'keyid': "03683F77", + 'filename': self.aptlistfile} + cfg = self.wrapv1conf([cfg]) + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1212', '')) as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + + mockobj.assert_called_with(['apt-key', 'add', '-'], + data=b'fakekey 1212', target=None) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def apt_src_keyid_real(self, cfg, expectedkey): + """apt_src_keyid_real + Test specification of a keyid without source including + up to addition of the key (add_apt_key_raw mocked to keep the + environment as is) + """ + key = cfg['keyid'] + keyserver = cfg.get('keyserver', 'keyserver.ubuntu.com') + cfg = self.wrapv1conf([cfg]) + + with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: + with mock.patch.object(gpg, 'getkeybyid', + return_value=expectedkey) as mockgetkey: + cc_apt_configure.handle("test", cfg, self.fakecloud, + None, None) + + mockgetkey.assert_called_with(key, keyserver) + mockkey.assert_called_with(expectedkey, None) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_keyid_real(self): + """test_apt_src_keyid_real - Test keyid including key add""" + keyid = "03683F77" + cfg = {'keyid': keyid, + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_longkeyid_real(self): + """test_apt_src_longkeyid_real - Test long keyid including key add""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {'keyid': keyid, + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_longkeyid_ks_real(self): + """test_apt_src_longkeyid_ks_real - Test long keyid from other ks""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {'keyid': keyid, + 'keyserver': 'keys.gnupg.net', + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_ppa(self): + """Test adding a ppa""" + cfg = {'source': 'ppa:smoser/cloud-init-test', + 'filename': self.aptlistfile} + cfg = self.wrapv1conf([cfg]) + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) + mockobj.assert_called_once_with(['add-apt-repository', + 'ppa:smoser/cloud-init-test'], + target=None) + + # adding ppa should ignore filename (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_ppa_tri(self): + """Test adding three ppa's""" + cfg1 = {'source': 'ppa:smoser/cloud-init-test', + 'filename': self.aptlistfile} + cfg2 = {'source': 'ppa:smoser/cloud-init-test2', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'ppa:smoser/cloud-init-test3', + 'filename': self.aptlistfile3} + cfg = self.wrapv1conf([cfg1, cfg2, cfg3]) + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.handle("test", cfg, self.fakecloud, + None, None) + calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'], + target=None), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'], + target=None), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'], + target=None)] + mockobj.assert_has_calls(calls, any_order=True) + + # adding ppa should ignore all filenames (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + self.assertFalse(os.path.isfile(self.aptlistfile2)) + self.assertFalse(os.path.isfile(self.aptlistfile3)) + + def test_convert_to_new_format(self): + """Test the conversion of old to new format""" + cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + cfg2 = {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'deb $MIRROR $RELEASE universe', + 'filename': self.aptlistfile3} + checkcfg = {self.aptlistfile: {'filename': self.aptlistfile, + 'source': 'deb $MIRROR $RELEASE ' + 'multiverse'}, + self.aptlistfile2: {'filename': self.aptlistfile2, + 'source': 'deb $MIRROR $RELEASE main'}, + self.aptlistfile3: {'filename': self.aptlistfile3, + 'source': 'deb $MIRROR $RELEASE ' + 'universe'}} + + newcfg = cc_apt_configure.convert_v1_to_v2_apt_format([cfg1, cfg2, + cfg3]) + self.assertEqual(newcfg, checkcfg) + + newcfg2 = cc_apt_configure.convert_v1_to_v2_apt_format(newcfg) + self.assertEqual(newcfg2, checkcfg) + + with self.assertRaises(ValueError): + cc_apt_configure.convert_v1_to_v2_apt_format(5) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py new file mode 100644 index 00000000..75556b6d --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py @@ -0,0 +1,1103 @@ +"""test_handler_apt_source_v3 +Testing various config variations of the apt_source custom config +This tries to call all in the new v3 format and cares about new features +""" +import glob +import os +import re +import shutil +import socket +import tempfile + +from unittest import TestCase + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import gpg +from cloudinit import helpers +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from .. import helpers as t_help + +EXPECTEDKEY = u"""-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ +NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy +8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 +HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG +CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 +OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ +FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm +S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== +=ACB2 +-----END PGP PUBLIC KEY BLOCK-----""" + +ADD_APT_REPO_MATCH = r"^[\w-]+:\w" + +TARGET = None + + +def load_tfile(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class TestAptSourceConfig(t_help.FilesystemMockingTestCase): + """TestAptSourceConfig + Main Class to test apt configs + """ + def setUp(self): + super(TestAptSourceConfig, self).setUp() + self.tmp = tempfile.mkdtemp() + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.aptlistfile = os.path.join(self.tmp, "single-deb.list") + self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") + self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") + self.join = os.path.join + self.matcher = re.compile(ADD_APT_REPO_MATCH).search + + @staticmethod + def _add_apt_sources(*args, **kwargs): + with mock.patch.object(cc_apt_configure, 'update_packages'): + cc_apt_configure.add_apt_sources(*args, **kwargs) + + @staticmethod + def _get_default_params(): + """get_default_params + Get the most basic default mrror and release info to be used in tests + """ + params = {} + params['RELEASE'] = util.lsb_release()['codename'] + arch = 'amd64' + params['MIRROR'] = cc_apt_configure.\ + get_default_mirrors(arch)["PRIMARY"] + return params + + def _myjoin(self, *args, **kwargs): + """_myjoin - redir into writable tmpdir""" + if (args[0] == "/etc/apt/sources.list.d/" and + args[1] == "cloud_config_sources.list" and + len(args) == 2): + return self.join(self.tmp, args[0].lstrip("/"), args[1]) + else: + return self.join(*args, **kwargs) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def _apt_src_basic(self, filename, cfg): + """_apt_src_basic + Test Fix deb source string, has to overwrite mirror conf in params + """ + params = self._get_default_params() + + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "karmic-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_basic(self): + """test_apt_v3_src_basic - Test fix deb source string""" + cfg = {self.aptlistfile: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}} + self._apt_src_basic(self.aptlistfile, cfg) + + def test_apt_v3_src_basic_tri(self): + """test_apt_v3_src_basic_tri - Test multiple fix deb source strings""" + cfg = {self.aptlistfile: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}, + self.aptlistfile2: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted')}, + self.aptlistfile3: {'source': + ('deb http://test.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted')}} + self._apt_src_basic(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "precise-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://test.ubuntu.com/ubuntu", + "lucid-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def _apt_src_replacement(self, filename, cfg): + """apt_src_replace + Test Autoreplacement of MIRROR and RELEASE in source specs + """ + params = self._get_default_params() + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_replace(self): + """test_apt_v3_src_replace - Test replacement of MIRROR & RELEASE""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}} + self._apt_src_replacement(self.aptlistfile, cfg) + + def test_apt_v3_src_replace_fn(self): + """test_apt_v3_src_replace_fn - Test filename overwritten in dict""" + cfg = {'ignored': {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile}} + # second file should overwrite the dict key + self._apt_src_replacement(self.aptlistfile, cfg) + + def _apt_src_replace_tri(self, cfg): + """_apt_src_replace_tri + Test three autoreplacements of MIRROR and RELEASE in source specs with + generic part + """ + self._apt_src_replacement(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + params = self._get_default_params() + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "main"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "universe"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_replace_tri(self): + """test_apt_v3_src_replace_tri - Test multiple replace/overwrites""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, + 'notused': {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} + self._apt_src_replace_tri(cfg) + + def _apt_src_keyid(self, filename, cfg, keynum): + """_apt_src_keyid + Test specification of a source + keyid + """ + params = self._get_default_params() + + with mock.patch("cloudinit.util.subp", + return_value=('fakekey 1234', '')) as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + # check if it added the right ammount of keys + calls = [] + for _ in range(keynum): + calls.append(call(['apt-key', 'add', '-'], data=b'fakekey 1234', + target=TARGET)) + mockobj.assert_has_calls(calls, any_order=True) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_keyid(self): + """test_apt_v3_src_keyid - Test source + keyid with filename""" + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"}} + self._apt_src_keyid(self.aptlistfile, cfg, 1) + + def test_apt_v3_src_keyid_tri(self): + """test_apt_v3_src_keyid_tri - Test multiple src+key+filen writes""" + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"}, + 'ignored': {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial universe'), + 'keyid': "03683F77", + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial multiverse'), + 'keyid': "03683F77"}} + + self._apt_src_keyid(self.aptlistfile, cfg, 3) + contents = load_tfile(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "universe"), + contents, flags=re.IGNORECASE)) + contents = load_tfile(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_key(self): + """test_apt_v3_src_key - Test source + key""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321"}} + + with mock.patch.object(util, 'subp') as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4321', + target=TARGET) + + self.assertTrue(os.path.isfile(self.aptlistfile)) + + contents = load_tfile(self.aptlistfile) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_v3_src_keyonly(self): + """test_apt_v3_src_keyonly - Test key without source""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'key': "fakekey 4242"}} + + with mock.patch.object(util, 'subp') as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4242', + target=TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_keyidonly(self): + """test_apt_v3_src_keyidonly - Test keyid without source""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'keyid': "03683F77"}} + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1212', '')) as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 1212', + target=TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def apt_src_keyid_real(self, cfg, expectedkey): + """apt_src_keyid_real + Test specification of a keyid without source including + up to addition of the key (add_apt_key_raw mocked to keep the + environment as is) + """ + params = self._get_default_params() + + with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: + with mock.patch.object(gpg, 'getkeybyid', + return_value=expectedkey) as mockgetkey: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + keycfg = cfg[self.aptlistfile] + mockgetkey.assert_called_with(keycfg['keyid'], + keycfg.get('keyserver', + 'keyserver.ubuntu.com')) + mockkey.assert_called_with(expectedkey, TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_keyid_real(self): + """test_apt_v3_src_keyid_real - Test keyid including key add""" + keyid = "03683F77" + cfg = {self.aptlistfile: {'keyid': keyid}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_longkeyid_real(self): + """test_apt_v3_src_longkeyid_real Test long keyid including key add""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {self.aptlistfile: {'keyid': keyid}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_longkeyid_ks_real(self): + """test_apt_v3_src_longkeyid_ks_real Test long keyid from other ks""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {self.aptlistfile: {'keyid': keyid, + 'keyserver': 'keys.gnupg.net'}} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_v3_src_keyid_keyserver(self): + """test_apt_v3_src_keyid_keyserver - Test custom keyserver""" + keyid = "03683F77" + params = self._get_default_params() + cfg = {self.aptlistfile: {'keyid': keyid, + 'keyserver': 'test.random.com'}} + + # in some test environments only *.ubuntu.com is reachable + # so mock the call and check if the config got there + with mock.patch.object(gpg, 'getkeybyid', + return_value="fakekey") as mockgetkey: + with mock.patch.object(cc_apt_configure, + 'add_apt_key_raw') as mockadd: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + + mockgetkey.assert_called_with('03683F77', 'test.random.com') + mockadd.assert_called_with('fakekey', TARGET) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_ppa(self): + """test_apt_v3_src_ppa - Test specification of a ppa""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'}} + + with mock.patch("cloudinit.util.subp") as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + mockobj.assert_any_call(['add-apt-repository', + 'ppa:smoser/cloud-init-test'], target=TARGET) + + # adding ppa should ignore filename (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_v3_src_ppa_tri(self): + """test_apt_v3_src_ppa_tri - Test specification of multiple ppa's""" + params = self._get_default_params() + cfg = {self.aptlistfile: {'source': 'ppa:smoser/cloud-init-test'}, + self.aptlistfile2: {'source': 'ppa:smoser/cloud-init-test2'}, + self.aptlistfile3: {'source': 'ppa:smoser/cloud-init-test3'}} + + with mock.patch("cloudinit.util.subp") as mockobj: + self._add_apt_sources(cfg, TARGET, template_params=params, + aa_repo_match=self.matcher) + calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test'], + target=TARGET), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test2'], + target=TARGET), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'], + target=TARGET)] + mockobj.assert_has_calls(calls, any_order=True) + + # adding ppa should ignore all filenames (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + self.assertFalse(os.path.isfile(self.aptlistfile2)) + self.assertFalse(os.path.isfile(self.aptlistfile3)) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_list_rename(self, m_get_architecture): + """test_apt_v3_list_rename - Test find mirror and apt list renaming""" + pre = "/var/lib/apt/lists" + # filenames are archive dependent + + arch = 's390x' + m_get_architecture.return_value = arch + component = "ubuntu-ports" + archive = "ports.ubuntu.com" + + cfg = {'primary': [{'arches': ["default"], + 'uri': + 'http://test.ubuntu.com/%s/' % component}], + 'security': [{'arches': ["default"], + 'uri': + 'http://testsec.ubuntu.com/%s/' % component}]} + post = ("%s_dists_%s-updates_InRelease" % + (component, util.lsb_release()['codename'])) + fromfn = ("%s/%s_%s" % (pre, archive, post)) + tofn = ("%s/test.ubuntu.com_%s" % (pre, post)) + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + + self.assertEqual(mirrors['MIRROR'], + "http://test.ubuntu.com/%s/" % component) + self.assertEqual(mirrors['PRIMARY'], + "http://test.ubuntu.com/%s/" % component) + self.assertEqual(mirrors['SECURITY'], + "http://testsec.ubuntu.com/%s/" % component) + + with mock.patch.object(os, 'rename') as mockren: + with mock.patch.object(glob, 'glob', + return_value=[fromfn]): + cc_apt_configure.rename_apt_lists(mirrors, TARGET) + + mockren.assert_any_call(fromfn, tofn) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_list_rename_non_slash(self, m_get_architecture): + target = os.path.join(self.tmp, "rename_non_slash") + apt_lists_d = os.path.join(target, "./" + cc_apt_configure.APT_LISTS) + + m_get_architecture.return_value = 'amd64' + + mirror_path = "some/random/path/" + primary = "http://test.ubuntu.com/" + mirror_path + security = "http://test-security.ubuntu.com/" + mirror_path + mirrors = {'PRIMARY': primary, 'SECURITY': security} + + # these match default archive prefixes + opri_pre = "archive.ubuntu.com_ubuntu_dists_xenial" + osec_pre = "security.ubuntu.com_ubuntu_dists_xenial" + # this one won't match and should not be renamed defaults. + other_pre = "dl.google.com_linux_chrome_deb_dists_stable" + # these are our new expected prefixes + npri_pre = "test.ubuntu.com_some_random_path_dists_xenial" + nsec_pre = "test-security.ubuntu.com_some_random_path_dists_xenial" + + files = [ + # orig prefix, new prefix, suffix + (opri_pre, npri_pre, "_main_binary-amd64_Packages"), + (opri_pre, npri_pre, "_main_binary-amd64_InRelease"), + (opri_pre, npri_pre, "-updates_main_binary-amd64_Packages"), + (opri_pre, npri_pre, "-updates_main_binary-amd64_InRelease"), + (other_pre, other_pre, "_main_binary-amd64_Packages"), + (other_pre, other_pre, "_Release"), + (other_pre, other_pre, "_Release.gpg"), + (osec_pre, nsec_pre, "_InRelease"), + (osec_pre, nsec_pre, "_main_binary-amd64_Packages"), + (osec_pre, nsec_pre, "_universe_binary-amd64_Packages"), + ] + + expected = sorted([npre + suff for opre, npre, suff in files]) + # create files + for (opre, npre, suff) in files: + fpath = os.path.join(apt_lists_d, opre + suff) + util.write_file(fpath, content=fpath) + + cc_apt_configure.rename_apt_lists(mirrors, target) + found = sorted(os.listdir(apt_lists_d)) + self.assertEqual(expected, found) + + @staticmethod + def test_apt_v3_proxy(): + """test_apt_v3_proxy - Test apt_*proxy configuration""" + cfg = {"proxy": "foobar1", + "http_proxy": "foobar2", + "ftp_proxy": "foobar3", + "https_proxy": "foobar4"} + + with mock.patch.object(util, 'write_file') as mockobj: + cc_apt_configure.apply_apt_config(cfg, "proxyfn", "notused") + + mockobj.assert_called_with('proxyfn', + ('Acquire::http::Proxy "foobar1";\n' + 'Acquire::http::Proxy "foobar2";\n' + 'Acquire::ftp::Proxy "foobar3";\n' + 'Acquire::https::Proxy "foobar4";\n')) + + def test_apt_v3_mirror(self): + """test_apt_v3_mirror - Test defining a mirror""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir}], + "security": [{'arches': ["default"], + "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_default(self): + """test_apt_v3_mirror_default - Test without defining a mirror""" + arch = 'amd64' + default_mirrors = cc_apt_configure.get_default_mirrors(arch) + pmir = default_mirrors["PRIMARY"] + smir = default_mirrors["SECURITY"] + mycloud = self._get_cloud('ubuntu') + mirrors = cc_apt_configure.find_apt_mirror_info({}, mycloud, arch) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_arches(self): + """test_apt_v3_mirror_arches - Test arches selection of mirror""" + pmir = "http://my-primary.ubuntu.com/ubuntu/" + smir = "http://my-security.ubuntu.com/ubuntu/" + arch = 'ppc64el' + cfg = {"primary": [{'arches': ["default"], "uri": "notthis-primary"}, + {'arches': [arch], "uri": pmir}], + "security": [{'arches': ["default"], "uri": "nothis-security"}, + {'arches': [arch], "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + + self.assertEqual(mirrors['PRIMARY'], pmir) + self.assertEqual(mirrors['MIRROR'], pmir) + self.assertEqual(mirrors['SECURITY'], smir) + + def test_apt_v3_mirror_arches_default(self): + """test_apt_v3_mirror_arches - Test falling back to default arch""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir}, + {'arches': ["thisarchdoesntexist"], + "uri": "notthis"}], + "security": [{'arches': ["thisarchdoesntexist"], + "uri": "nothat"}, + {'arches': ["default"], + "uri": smir}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + @mock.patch("cloudinit.config.cc_apt_configure.util.get_architecture") + def test_apt_v3_get_def_mir_non_intel_no_arch(self, m_get_architecture): + arch = 'ppc64el' + m_get_architecture.return_value = arch + expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports', + 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'} + self.assertEqual(expected, cc_apt_configure.get_default_mirrors()) + + def test_apt_v3_get_default_mirrors_non_intel_with_arch(self): + found = cc_apt_configure.get_default_mirrors('ppc64el') + + expected = {'PRIMARY': 'http://ports.ubuntu.com/ubuntu-ports', + 'SECURITY': 'http://ports.ubuntu.com/ubuntu-ports'} + self.assertEqual(expected, found) + + def test_apt_v3_mirror_arches_sysdefault(self): + """test_apt_v3_mirror_arches - Test arches fallback to sys default""" + arch = 'amd64' + default_mirrors = cc_apt_configure.get_default_mirrors(arch) + pmir = default_mirrors["PRIMARY"] + smir = default_mirrors["SECURITY"] + mycloud = self._get_cloud('ubuntu') + cfg = {"primary": [{'arches': ["thisarchdoesntexist_64"], + "uri": "notthis"}, + {'arches': ["thisarchdoesntexist"], + "uri": "notthiseither"}], + "security": [{'arches': ["thisarchdoesntexist"], + "uri": "nothat"}, + {'arches': ["thisarchdoesntexist_64"], + "uri": "nothateither"}]} + + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + + self.assertEqual(mirrors['MIRROR'], pmir) + self.assertEqual(mirrors['PRIMARY'], pmir) + self.assertEqual(mirrors['SECURITY'], smir) + + def test_apt_v3_mirror_search(self): + """test_apt_v3_mirror_search - Test searching mirrors in a list + mock checks to avoid relying on network connectivity""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "search": ["pfailme", pmir]}], + "security": [{'arches': ["default"], + "search": ["sfailme", smir]}]} + + with mock.patch.object(cc_apt_configure, 'search_for_mirror', + side_effect=[pmir, smir]) as mocksearch: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, + 'amd64') + + calls = [call(["pfailme", pmir]), + call(["sfailme", smir])] + mocksearch.assert_has_calls(calls) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_mirror_search_many2(self): + """test_apt_v3_mirror_search_many3 - Test both mirrors specs at once""" + pmir = "http://us.archive.ubuntu.com/ubuntu/" + smir = "http://security.ubuntu.com/ubuntu/" + cfg = {"primary": [{'arches': ["default"], + "uri": pmir, + "search": ["pfailme", "foo"]}], + "security": [{'arches': ["default"], + "uri": smir, + "search": ["sfailme", "bar"]}]} + + arch = 'amd64' + + # should be called only once per type, despite two mirror configs + mycloud = None + with mock.patch.object(cc_apt_configure, 'get_mirror', + return_value="http://mocked/foo") as mockgm: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(cfg, 'primary', arch, mycloud), + call(cfg, 'security', arch, mycloud)] + mockgm.assert_has_calls(calls) + + # should not be called, since primary is specified + with mock.patch.object(cc_apt_configure, + 'search_for_mirror') as mockse: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + mockse.assert_not_called() + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + def test_apt_v3_url_resolvable(self): + """test_apt_v3_url_resolvable - Test resolving urls""" + + with mock.patch.object(util, 'is_resolvable') as mockresolve: + util.is_resolvable_url("http://1.2.3.4/ubuntu") + mockresolve.assert_called_with("1.2.3.4") + + with mock.patch.object(util, 'is_resolvable') as mockresolve: + util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu") + mockresolve.assert_called_with("us.archive.ubuntu.com") + + # former tests can leave this set (or not if the test is ran directly) + # do a hard reset to ensure a stable result + util._DNS_REDIRECT_IP = None + bad = [(None, None, None, "badname", ["10.3.2.1"])] + good = [(None, None, None, "goodname", ["10.2.3.4"])] + with mock.patch.object(socket, 'getaddrinfo', + side_effect=[bad, bad, bad, good, + good]) as mocksock: + ret = util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu") + ret2 = util.is_resolvable_url("http://1.2.3.4/ubuntu") + mocksock.assert_any_call('does-not-exist.example.com.', None, + 0, 0, 1, 2) + mocksock.assert_any_call('example.invalid.', None, 0, 0, 1, 2) + mocksock.assert_any_call('us.archive.ubuntu.com', None) + mocksock.assert_any_call('1.2.3.4', None) + + self.assertTrue(ret) + self.assertTrue(ret2) + + # side effect need only bad ret after initial call + with mock.patch.object(socket, 'getaddrinfo', + side_effect=[bad]) as mocksock: + ret3 = util.is_resolvable_url("http://failme.com/ubuntu") + calls = [call('failme.com', None)] + mocksock.assert_has_calls(calls) + self.assertFalse(ret3) + + def test_apt_v3_disable_suites(self): + """test_disable_suites - disable_suites with many configurations""" + release = "xenial" + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + + # disable nothing + disabled = [] + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable release suite + disabled = ["$RELEASE"] + expect = """\ +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable other suite + disabled = ["$RELEASE-updates"] + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu""" + """ xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # multi disable + disabled = ["$RELEASE-updates", "$RELEASE-security"] + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # multi line disable (same suite multiple times in input) + disabled = ["$RELEASE-updates", "$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://UBUNTU.com//ubuntu xenial-updates main +deb http://UBUNTU.COM//ubuntu xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +# suite disabled by cloud-init: deb http://UBUNTU.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """ + """xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # comment in input + disabled = ["$RELEASE-updates", "$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +#foo +#deb http://UBUNTU.com//ubuntu xenial-updates main +deb http://UBUNTU.COM//ubuntu xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +#foo +#deb http://UBUNTU.com//ubuntu xenial-updates main +# suite disabled by cloud-init: deb http://UBUNTU.COM//ubuntu """ + """xenial-updates main +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable custom suite + disabled = ["foobar"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ foobar main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +# suite disabled by cloud-init: deb http://ubuntu.com/ubuntu/ foobar main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable non existing suite + disabled = ["foobar"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ notfoobar main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +deb http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb http://ubuntu.com/ubuntu/ notfoobar main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite with option + disabled = ["$RELEASE-updates"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [a=b] http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb [a=b] http://ubu.com//ubu """ + """xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite with more options and auto $RELEASE expansion + disabled = ["updates"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [a=b c=d] http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = """deb http://ubuntu.com//ubuntu xenial main +# suite disabled by cloud-init: deb [a=b c=d] \ +http://ubu.com//ubu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + # single disable suite while options at others + disabled = ["$RELEASE-security"] + orig = """deb http://ubuntu.com//ubuntu xenial main +deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main +deb http://ubuntu.com//ubuntu xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""" + expect = ("""deb http://ubuntu.com//ubuntu xenial main +deb [arch=foo] http://ubuntu.com//ubuntu xenial-updates main +# suite disabled by cloud-init: deb http://ubuntu.com//ubuntu """ + """xenial-security main +deb-src http://ubuntu.com//ubuntu universe multiverse +deb http://ubuntu.com/ubuntu/ xenial-proposed main""") + result = cc_apt_configure.disable_suites(disabled, orig, release) + self.assertEqual(expect, result) + + def test_disable_suites_blank_lines(self): + """test_disable_suites_blank_lines - ensure blank lines allowed""" + lines = ["deb %(repo)s %(rel)s main universe", + "", + "deb %(repo)s %(rel)s-updates main universe", + " # random comment", + "#comment here", + ""] + rel = "trusty" + repo = 'http://example.com/mirrors/ubuntu' + orig = "\n".join(lines) % {'repo': repo, 'rel': rel} + self.assertEqual( + orig, cc_apt_configure.disable_suites(["proposed"], orig, rel)) + + def test_apt_v3_mirror_search_dns(self): + """test_apt_v3_mirror_search_dns - Test searching dns patterns""" + pmir = "phit" + smir = "shit" + arch = 'amd64' + mycloud = self._get_cloud('ubuntu') + cfg = {"primary": [{'arches': ["default"], + "search_dns": True}], + "security": [{'arches': ["default"], + "search_dns": True}]} + + with mock.patch.object(cc_apt_configure, 'get_mirror', + return_value="http://mocked/foo") as mockgm: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(cfg, 'primary', arch, mycloud), + call(cfg, 'security', arch, mycloud)] + mockgm.assert_has_calls(calls) + + with mock.patch.object(cc_apt_configure, 'search_for_mirror_dns', + return_value="http://mocked/foo") as mocksdns: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + calls = [call(True, 'primary', cfg, mycloud), + call(True, 'security', cfg, mycloud)] + mocksdns.assert_has_calls(calls) + + # first return is for the non-dns call before + with mock.patch.object(cc_apt_configure, 'search_for_mirror', + side_effect=[None, pmir, None, smir]) as mockse: + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, mycloud, arch) + + calls = [call(None), + call(['http://ubuntu-mirror.localdomain/ubuntu', + 'http://ubuntu-mirror/ubuntu']), + call(None), + call(['http://ubuntu-security-mirror.localdomain/ubuntu', + 'http://ubuntu-security-mirror/ubuntu'])] + mockse.assert_has_calls(calls) + + self.assertEqual(mirrors['MIRROR'], + pmir) + self.assertEqual(mirrors['PRIMARY'], + pmir) + self.assertEqual(mirrors['SECURITY'], + smir) + + +class TestDebconfSelections(TestCase): + + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + def test_no_set_sel_if_none_to_set(self, m_set_sel): + cc_apt_configure.apply_debconf_selections({'foo': 'bar'}) + m_set_sel.assert_not_called() + + @mock.patch("cloudinit.config.cc_apt_configure." + "debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_set_sel_call_has_expected_input(self, m_get_inst, m_set_sel): + data = { + 'set1': 'pkga pkga/q1 mybool false', + 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n' + 'pkgc\tpkgc/ip\tstring\t10.0.0.1')} + lines = '\n'.join(data.values()).split('\n') + + m_get_inst.return_value = ["adduser", "apparmor"] + m_set_sel.return_value = None + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + self.assertTrue(m_get_inst.called) + self.assertEqual(m_set_sel.call_count, 1) + + # assumes called with *args value. + selections = m_set_sel.call_args_list[0][0][0].decode() + + missing = [l for l in lines if l not in selections.splitlines()] + self.assertEqual([], missing) + + @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure") + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_reconfigure_if_intersection(self, m_get_inst, m_set_sel, + m_dpkg_r): + data = { + 'set1': 'pkga pkga/q1 mybool false', + 'set2': ('pkgb\tpkgb/b1\tstr\tthis is a string\n' + 'pkgc\tpkgc/ip\tstring\t10.0.0.1'), + 'cloud-init': ('cloud-init cloud-init/datasources' + 'multiselect MAAS')} + + m_set_sel.return_value = None + m_get_inst.return_value = ["adduser", "apparmor", "pkgb", + "cloud-init", 'zdog'] + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + + # reconfigure should be called with the intersection + # of (packages in config, packages installed) + self.assertEqual(m_dpkg_r.call_count, 1) + # assumes called with *args (dpkg_reconfigure([a,b,c], target=)) + packages = m_dpkg_r.call_args_list[0][0][0] + self.assertEqual(set(['cloud-init', 'pkgb']), set(packages)) + + @mock.patch("cloudinit.config.cc_apt_configure.dpkg_reconfigure") + @mock.patch("cloudinit.config.cc_apt_configure.debconf_set_selections") + @mock.patch("cloudinit.config.cc_apt_configure." + "util.get_installed_packages") + def test_reconfigure_if_no_intersection(self, m_get_inst, m_set_sel, + m_dpkg_r): + data = {'set1': 'pkga pkga/q1 mybool false'} + + m_get_inst.return_value = ["adduser", "apparmor", "pkgb", + "cloud-init", 'zdog'] + m_set_sel.return_value = None + + cc_apt_configure.apply_debconf_selections({'debconf_selections': data}) + + self.assertTrue(m_get_inst.called) + self.assertEqual(m_dpkg_r.call_count, 0) + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_does_reconfigure(self, m_subp): + target = "/foo-target" + + # due to the way the cleaners are called (via dictionary reference) + # mocking clean_cloud_init directly does not work. So we mock + # the CONFIG_CLEANERS dictionary and assert our cleaner is called. + ci_cleaner = mock.MagicMock() + with mock.patch.dict(("cloudinit.config.cc_apt_configure." + "CONFIG_CLEANERS"), + values={'cloud-init': ci_cleaner}, clear=True): + cc_apt_configure.dpkg_reconfigure(['pkga', 'cloud-init'], + target=target) + # cloud-init is actually the only package we have a cleaner for + # so for now, its the only one that should reconfigured + self.assertTrue(m_subp.called) + ci_cleaner.assert_called_with(target) + self.assertEqual(m_subp.call_count, 1) + found = m_subp.call_args_list[0][0][0] + expected = ['dpkg-reconfigure', '--frontend=noninteractive', + 'cloud-init'] + self.assertEqual(expected, found) + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_not_done_on_no_data(self, m_subp): + cc_apt_configure.dpkg_reconfigure([]) + m_subp.assert_not_called() + + @mock.patch("cloudinit.config.cc_apt_configure.util.subp") + def test_dpkg_reconfigure_not_done_if_no_cleaners(self, m_subp): + cc_apt_configure.dpkg_reconfigure(['pkgfoo', 'pkgbar']) + m_subp.assert_not_called() + +# +# vi: ts=4 expandtab diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 73369cd3..d2031f59 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -508,4 +508,73 @@ class TestReadSeeded(helpers.TestCase): self.assertEqual(found_md, {'key1': 'val1'}) self.assertEqual(found_ud, ud) + +class TestSubp(helpers.TestCase): + + stdin2err = ['bash', '-c', 'cat >&2'] + stdin2out = ['cat'] + utf8_invalid = b'ab\xaadef' + utf8_valid = b'start \xc3\xa9 end' + utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' + + def printf_cmd(self, *args): + # bash's printf supports \xaa. So does /usr/bin/printf + # but by using bash, we remove dependency on another program. + return(['bash', '-c', 'printf "$@"', 'printf'] + list(args)) + + def test_subp_handles_utf8(self): + # The given bytes contain utf-8 accented characters as seen in e.g. + # the "deja dup" package in Ubuntu. + cmd = self.printf_cmd(self.utf8_valid_2) + (out, _err) = util.subp(cmd, capture=True) + self.assertEqual(out, self.utf8_valid_2.decode('utf-8')) + + def test_subp_respects_decode_false(self): + (out, err) = util.subp(self.stdin2out, capture=True, decode=False, + data=self.utf8_valid) + self.assertTrue(isinstance(out, bytes)) + self.assertTrue(isinstance(err, bytes)) + self.assertEqual(out, self.utf8_valid) + + def test_subp_decode_ignore(self): + # this executes a string that writes invalid utf-8 to stdout + (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'), + capture=True, decode='ignore') + self.assertEqual(out, 'abcdef') + + def test_subp_decode_strict_valid_utf8(self): + (out, _err) = util.subp(self.stdin2out, capture=True, + decode='strict', data=self.utf8_valid) + self.assertEqual(out, self.utf8_valid.decode('utf-8')) + + def test_subp_decode_invalid_utf8_replaces(self): + (out, _err) = util.subp(self.stdin2out, capture=True, + data=self.utf8_invalid) + expected = self.utf8_invalid.decode('utf-8', errors='replace') + self.assertEqual(out, expected) + + def test_subp_decode_strict_raises(self): + args = [] + kwargs = {'args': self.stdin2out, 'capture': True, + 'decode': 'strict', 'data': self.utf8_invalid} + self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs) + + def test_subp_capture_stderr(self): + data = b'hello world' + (out, err) = util.subp(self.stdin2err, capture=True, + decode=False, data=data) + self.assertEqual(err, data) + self.assertEqual(out, b'') + + def test_returns_none_if_no_capture(self): + (out, err) = util.subp(self.stdin2out, data=b'', capture=False) + self.assertEqual(err, None) + self.assertEqual(out, None) + + def test_bunch_of_slashes_in_path(self): + self.assertEqual("/target/my/path/", + util.target_path("/target/", "//my/path/")) + self.assertEqual("/target/my/path/", + util.target_path("/target/", "///my/path/")) + # vi: ts=4 expandtab -- cgit v1.2.3