From c5e949c02a1d5226ae9b1cb39846db19d223c6c2 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Wed, 25 Mar 2020 09:44:16 -0400 Subject: distros/tests/test_init: add tests for _get_package_mirror_info (#272) --- cloudinit/distros/tests/test_init.py | 83 ++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 cloudinit/distros/tests/test_init.py (limited to 'cloudinit/distros/tests/test_init.py') diff --git a/cloudinit/distros/tests/test_init.py b/cloudinit/distros/tests/test_init.py new file mode 100644 index 00000000..707a0a49 --- /dev/null +++ b/cloudinit/distros/tests/test_init.py @@ -0,0 +1,83 @@ +# Copyright (C) 2020 Canonical Ltd. +# +# Author: Daniel Watkins +# +# This file is part of cloud-init. See LICENSE file for license information. +"""Tests for cloudinit/distros/__init__.py""" + +from unittest import mock + +import pytest + +from cloudinit.distros import _get_package_mirror_info + + +class TestGetPackageMirrorInfo: + """ + Tests for cloudinit.distros._get_package_mirror_info. + + These supplement the tests in tests/unittests/test_distros/test_generic.py + which are more focused on testing a single production-like configuration. + These tests are more focused on specific aspects of the unit under test. + """ + + @pytest.mark.parametrize('mirror_info,expected', [ + # Empty info gives empty return + ({}, {}), + # failsafe values used if present + ({'failsafe': {'primary': 'value', 'security': 'other'}}, + {'primary': 'value', 'security': 'other'}), + # search values used if present + ({'search': {'primary': ['value'], 'security': ['other']}}, + {'primary': ['value'], 'security': ['other']}), + # failsafe values used if search value not present + ({'search': {'primary': ['value']}, 'failsafe': {'security': 'other'}}, + {'primary': ['value'], 'security': 'other'}) + ]) + def test_get_package_mirror_info_failsafe(self, mirror_info, expected): + """ + Test the interaction between search and failsafe inputs + + (This doesn't test the case where the mirror_filter removes all search + options; test_failsafe_used_if_all_search_results_filtered_out covers + that.) + """ + assert expected == _get_package_mirror_info(mirror_info, + mirror_filter=lambda x: x) + + def test_failsafe_used_if_all_search_results_filtered_out(self): + """Test the failsafe option used if all search options eliminated.""" + mirror_info = { + 'search': {'primary': ['value']}, 'failsafe': {'primary': 'other'} + } + assert {'primary': 'other'} == _get_package_mirror_info( + mirror_info, mirror_filter=lambda x: False) + + @pytest.mark.parametrize('availability_zone,region,patterns,expected', ( + # Test ec2_region alone + ('fk-fake-1f', None, ['EC2-%(ec2_region)s'], ['EC2-fk-fake-1']), + # Test availability_zone alone + ('fk-fake-1f', None, ['AZ-%(availability_zone)s'], ['AZ-fk-fake-1f']), + # Test region alone + (None, 'fk-fake-1', ['RG-%(region)s'], ['RG-fk-fake-1']), + # Test that ec2_region is not available for non-matching AZs + ('fake-fake-1f', None, + ['EC2-%(ec2_region)s', 'AZ-%(availability_zone)s'], + ['AZ-fake-fake-1f']), + # Test that template order maintained + (None, 'fake-region', ['RG-%(region)s-2', 'RG-%(region)s-1'], + ['RG-fake-region-2', 'RG-fake-region-1']), + )) + def test_substitution(self, availability_zone, region, patterns, expected): + """Test substitution works as expected.""" + m_data_source = mock.Mock( + availability_zone=availability_zone, region=region + ) + mirror_info = {'search': {'primary': patterns}} + + ret = _get_package_mirror_info( + mirror_info, + data_source=m_data_source, + mirror_filter=lambda x: x + ) + assert {'primary': expected} == ret -- cgit v1.2.3 From c478d0bff412c67280dfe8f08568de733f9425a1 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Tue, 31 Mar 2020 13:52:21 -0400 Subject: distros: replace invalid characters in mirror URLs with hyphens (#291) This modifies _get_package_mirror_info to convert the hostnames of generated mirror URLs to their IDNA form, and then iterate through them replacing any invalid characters (i.e. anything other than letters, digits or a hyphen) with a hyphen. This commit introduces the following changes in behaviour: * generated mirror URLs with Unicode characters in their hostnames will have their hostnames converted to their all-ASCII IDNA form * generated mirror URLs with invalid-for-hostname characters in their hostname will have those characters converted to hyphens * generated mirror URLs which cannot be parsed by `urllib.parse.urlsplit` will not be considered for use * other configured patterns will still be considered * if all configured patterns fail to produce a URL that parses then the fallback mirror URL will be used LP: #1868232 --- cloudinit/distros/__init__.py | 109 ++++++++++++++++++++++++++++++++++- cloudinit/distros/tests/test_init.py | 84 +++++++++++++++++++++------ 2 files changed, 174 insertions(+), 19 deletions(-) (limited to 'cloudinit/distros/tests/test_init.py') diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 92598a2d..86a22c3e 100755 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -13,6 +13,8 @@ import abc import os import re import stat +import string +import urllib.parse from io import StringIO from cloudinit import importer @@ -50,6 +52,9 @@ _EC2_AZ_RE = re.compile('^[a-z][a-z]-(?:[a-z]+-)+[0-9][a-z]$') # Default NTP Client Configurations PREFERRED_NTP_CLIENTS = ['chrony', 'systemd-timesyncd', 'ntp', 'ntpdate'] +# Letters/Digits/Hyphen characters, for use in domain name validation +LDH_ASCII_CHARS = string.ascii_letters + string.digits + "-" + class Distro(metaclass=abc.ABCMeta): @@ -720,6 +725,102 @@ class Distro(metaclass=abc.ABCMeta): LOG.info("Added user '%s' to group '%s'", member, name) +def _apply_hostname_transformations_to_url(url: str, transformations: list): + """ + Apply transformations to a URL's hostname, return transformed URL. + + This is a separate function because unwrapping and rewrapping only the + hostname portion of a URL is complex. + + :param url: + The URL to operate on. + :param transformations: + A list of ``(str) -> Optional[str]`` functions, which will be applied + in order to the hostname portion of the URL. If any function + (regardless of ordering) returns None, ``url`` will be returned without + any modification. + + :return: + A string whose value is ``url`` with the hostname ``transformations`` + applied, or ``None`` if ``url`` is unparseable. + """ + try: + parts = urllib.parse.urlsplit(url) + except ValueError: + # If we can't even parse the URL, we shouldn't use it for anything + return None + new_hostname = parts.hostname + + for transformation in transformations: + new_hostname = transformation(new_hostname) + if new_hostname is None: + # If a transformation returns None, that indicates we should abort + # processing and return `url` unmodified + return url + + new_netloc = new_hostname + if parts.port is not None: + new_netloc = "{}:{}".format(new_netloc, parts.port) + return urllib.parse.urlunsplit(parts._replace(netloc=new_netloc)) + + +def _sanitize_mirror_url(url: str): + """ + Given a mirror URL, replace or remove any invalid URI characters. + + This performs the following actions on the URL's hostname: + * Checks if it is an IP address, returning the URL immediately if it is + * Converts it to its IDN form (see below for details) + * Replaces any non-Letters/Digits/Hyphen (LDH) characters in it with + hyphens + * TODO: Remove any leading/trailing hyphens from each domain name label + + Before we replace any invalid domain name characters, we first need to + ensure that any valid non-ASCII characters in the hostname will not be + replaced, by ensuring the hostname is in its Internationalized domain name + (IDN) representation (see RFC 5890). This conversion has to be applied to + the whole hostname (rather than just the substitution variables), because + the Punycode algorithm used by IDNA transcodes each part of the hostname as + a whole string (rather than encoding individual characters). It cannot be + applied to the whole URL, because (a) the Punycode algorithm expects to + operate on domain names so doesn't output a valid URL, and (b) non-ASCII + characters in non-hostname parts of the URL aren't encoded via Punycode. + + To put this in RFC 5890's terminology: before we remove or replace any + characters from our domain name (which we do to ensure that each label is a + valid LDH Label), we first ensure each label is in its A-label form. + + (Note that Python's builtin idna encoding is actually IDNA2003, not + IDNA2008. This changes the specifics of how some characters are encoded to + ASCII, but doesn't affect the logic here.) + + :param url: + The URL to operate on. + + :return: + A sanitized version of the URL, which will have been IDNA encoded if + necessary, or ``None`` if the generated string is not a parseable URL. + """ + # Acceptable characters are LDH characters, plus "." to separate each label + acceptable_chars = LDH_ASCII_CHARS + "." + transformations = [ + # This is an IP address, not a hostname, so no need to apply the + # transformations + lambda hostname: None if net.is_ip_address(hostname) else hostname, + + # Encode with IDNA to get the correct characters (as `bytes`), then + # decode with ASCII so we return a `str` + lambda hostname: hostname.encode('idna').decode('ascii'), + + # Replace any unacceptable characters with "-" + lambda hostname: ''.join( + c if c in acceptable_chars else "-" for c in hostname + ), + ] + + return _apply_hostname_transformations_to_url(url, transformations) + + def _get_package_mirror_info(mirror_info, data_source=None, mirror_filter=util.search_for_mirror): # given a arch specific 'mirror_info' entry (from package_mirrors) @@ -748,9 +849,13 @@ def _get_package_mirror_info(mirror_info, data_source=None, mirrors = [] for tmpl in searchlist: try: - mirrors.append(tmpl % subst) + mirror = tmpl % subst except KeyError: - pass + continue + + mirror = _sanitize_mirror_url(mirror) + if mirror is not None: + mirrors.append(mirror) found = mirror_filter(mirrors) if found: diff --git a/cloudinit/distros/tests/test_init.py b/cloudinit/distros/tests/test_init.py index 707a0a49..daa81ab8 100644 --- a/cloudinit/distros/tests/test_init.py +++ b/cloudinit/distros/tests/test_init.py @@ -9,7 +9,18 @@ from unittest import mock import pytest -from cloudinit.distros import _get_package_mirror_info +from cloudinit.distros import _get_package_mirror_info, LDH_ASCII_CHARS + + +# Define a set of characters we would expect to be replaced +INVALID_URL_CHARS = [ + chr(x) for x in range(127) if chr(x) not in LDH_ASCII_CHARS +] +for separator in [":", ".", "/", "#", "?", "@", "[", "]"]: + # Remove from the set characters that either separate hostname parts (":", + # "."), terminate hostnames ("/", "#", "?", "@"), or cause Python to be + # unable to parse URLs ("[", "]"). + INVALID_URL_CHARS.remove(separator) class TestGetPackageMirrorInfo: @@ -25,14 +36,16 @@ class TestGetPackageMirrorInfo: # Empty info gives empty return ({}, {}), # failsafe values used if present - ({'failsafe': {'primary': 'value', 'security': 'other'}}, - {'primary': 'value', 'security': 'other'}), + ({'failsafe': {'primary': 'http://value', 'security': 'http://other'}}, + {'primary': 'http://value', 'security': 'http://other'}), # search values used if present - ({'search': {'primary': ['value'], 'security': ['other']}}, - {'primary': ['value'], 'security': ['other']}), + ({'search': {'primary': ['http://value'], + 'security': ['http://other']}}, + {'primary': ['http://value'], 'security': ['http://other']}), # failsafe values used if search value not present - ({'search': {'primary': ['value']}, 'failsafe': {'security': 'other'}}, - {'primary': ['value'], 'security': 'other'}) + ({'search': {'primary': ['http://value']}, + 'failsafe': {'security': 'http://other'}}, + {'primary': ['http://value'], 'security': 'http://other'}) ]) def test_get_package_mirror_info_failsafe(self, mirror_info, expected): """ @@ -48,26 +61,63 @@ class TestGetPackageMirrorInfo: def test_failsafe_used_if_all_search_results_filtered_out(self): """Test the failsafe option used if all search options eliminated.""" mirror_info = { - 'search': {'primary': ['value']}, 'failsafe': {'primary': 'other'} + 'search': {'primary': ['http://value']}, + 'failsafe': {'primary': 'http://other'} } - assert {'primary': 'other'} == _get_package_mirror_info( + assert {'primary': 'http://other'} == _get_package_mirror_info( mirror_info, mirror_filter=lambda x: False) @pytest.mark.parametrize('availability_zone,region,patterns,expected', ( # Test ec2_region alone - ('fk-fake-1f', None, ['EC2-%(ec2_region)s'], ['EC2-fk-fake-1']), + ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'], + ['http://ec2-fk-fake-1/ubuntu']), # Test availability_zone alone - ('fk-fake-1f', None, ['AZ-%(availability_zone)s'], ['AZ-fk-fake-1f']), + ('fk-fake-1f', None, ['http://AZ-%(availability_zone)s/ubuntu'], + ['http://az-fk-fake-1f/ubuntu']), # Test region alone - (None, 'fk-fake-1', ['RG-%(region)s'], ['RG-fk-fake-1']), + (None, 'fk-fake-1', ['http://RG-%(region)s/ubuntu'], + ['http://rg-fk-fake-1/ubuntu']), # Test that ec2_region is not available for non-matching AZs ('fake-fake-1f', None, - ['EC2-%(ec2_region)s', 'AZ-%(availability_zone)s'], - ['AZ-fake-fake-1f']), + ['http://EC2-%(ec2_region)s/ubuntu', + 'http://AZ-%(availability_zone)s/ubuntu'], + ['http://az-fake-fake-1f/ubuntu']), # Test that template order maintained - (None, 'fake-region', ['RG-%(region)s-2', 'RG-%(region)s-1'], - ['RG-fake-region-2', 'RG-fake-region-1']), - )) + (None, 'fake-region', + ['http://RG-%(region)s-2/ubuntu', 'http://RG-%(region)s-1/ubuntu'], + ['http://rg-fake-region-2/ubuntu', 'http://rg-fake-region-1/ubuntu']), + # Test that non-ASCII hostnames are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com/ubuntu'], + ['http://www.xn--idna--4kd53hh6aba3q.com/ubuntu']), + # Test that non-ASCII hostnames with a port are IDNA encoded; + # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q" + (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com:8080/ubuntu'], + ['http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu']), + # Test that non-ASCII non-hostname parts of URLs are unchanged + (None, 'ТεЅТ̣', ['http://www.example.com/%(region)s/ubuntu'], + ['http://www.example.com/ТεЅТ̣/ubuntu']), + # Test that IPv4 addresses are unchanged + (None, 'fk-fake-1', ['http://192.168.1.1:8080/%(region)s/ubuntu'], + ['http://192.168.1.1:8080/fk-fake-1/ubuntu']), + # Test that IPv6 addresses are unchanged + (None, 'fk-fake-1', + ['http://[2001:67c:1360:8001::23]/%(region)s/ubuntu'], + ['http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu']), + # Test that unparseable URLs are filtered out of the mirror list + (None, 'inv[lid', + ['http://%(region)s.in.hostname/should/be/filtered', + 'http://but.not.in.the.path/%(region)s'], + ['http://but.not.in.the.path/inv[lid']), + ) + ( + # Dynamically generate a test case for each non-LDH + # (Letters/Digits/Hyphen) ASCII character, testing that it is + # substituted with a hyphen + tuple( + (None, 'fk{0}fake{0}1'.format(invalid_char), + ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu']) + for invalid_char in INVALID_URL_CHARS)) + ) def test_substitution(self, availability_zone, region, patterns, expected): """Test substitution works as expected.""" m_data_source = mock.Mock( -- cgit v1.2.3 From 1bbc4908ff7a2be19483811b3b6fee6ebc916235 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Tue, 31 Mar 2020 17:04:17 -0400 Subject: distros: drop leading/trailing hyphens from mirror URL labels (#296) * distros/tests/test_init: drop needless brackets/indentation * distros: drop leading/trailing hyphens from mirror URL labels --- cloudinit/distros/__init__.py | 5 +++++ cloudinit/distros/tests/test_init.py | 14 ++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) (limited to 'cloudinit/distros/tests/test_init.py') diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 86a22c3e..e63b8c7a 100755 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -816,6 +816,11 @@ def _sanitize_mirror_url(url: str): lambda hostname: ''.join( c if c in acceptable_chars else "-" for c in hostname ), + + # Drop leading/trailing hyphens from each part of the hostname + lambda hostname: '.'.join( + part.strip('-') for part in hostname.split('.') + ), ] return _apply_hostname_transformations_to_url(url, transformations) diff --git a/cloudinit/distros/tests/test_init.py b/cloudinit/distros/tests/test_init.py index daa81ab8..40939133 100644 --- a/cloudinit/distros/tests/test_init.py +++ b/cloudinit/distros/tests/test_init.py @@ -109,15 +109,17 @@ class TestGetPackageMirrorInfo: ['http://%(region)s.in.hostname/should/be/filtered', 'http://but.not.in.the.path/%(region)s'], ['http://but.not.in.the.path/inv[lid']), - ) + ( + (None, '-some-region-', + ['http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu'], + ['http://lead-ing.some-region.trail-ing.example.com/ubuntu']), + ) + tuple( # Dynamically generate a test case for each non-LDH # (Letters/Digits/Hyphen) ASCII character, testing that it is # substituted with a hyphen - tuple( - (None, 'fk{0}fake{0}1'.format(invalid_char), - ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu']) - for invalid_char in INVALID_URL_CHARS)) - ) + (None, 'fk{0}fake{0}1'.format(invalid_char), + ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu']) + for invalid_char in INVALID_URL_CHARS + )) def test_substitution(self, availability_zone, region, patterns, expected): """Test substitution works as expected.""" m_data_source = mock.Mock( -- cgit v1.2.3 From 3fcdacc8995d6908858aceaf1da7ee5ff090fc04 Mon Sep 17 00:00:00 2001 From: lucasmoura Date: Tue, 30 Jun 2020 19:25:26 -0300 Subject: Disable ec2 mirror for non aws instances (#390) For versions before 20.2, we allowed the use of ec2 mirrors if the datasource availability_zone matches one of the ec2 regions. We are now updating that behavior to allow allow the use of ec2 mirrors on ec2 instances or if the user directly passes an an ec2 mirror url through #cloud-config apt directives. LP: #1456277 --- cloudinit/distros/__init__.py | 10 +- cloudinit/distros/tests/test_init.py | 35 +++- cloudinit/features.py | 11 ++ tests/unittests/test_distros/test_generic.py | 186 +++++++++++++-------- .../test_handler_apt_configure_sources_list_v1.py | 6 +- .../test_handler/test_handler_apt_source_v1.py | 7 + .../test_handler/test_handler_apt_source_v3.py | 27 ++- 7 files changed, 193 insertions(+), 89 deletions(-) (limited to 'cloudinit/distros/tests/test_init.py') diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 89940cf0..2fc91bbc 100755 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -28,6 +28,9 @@ from cloudinit import type_utils from cloudinit import subp from cloudinit import util +from cloudinit.features import \ + ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES + from cloudinit.distros.parsers import hosts from .networking import LinuxNetworking @@ -849,7 +852,12 @@ def _get_package_mirror_info(mirror_info, data_source=None, # ec2 availability zones are named cc-direction-[0-9][a-d] (us-east-1b) # the region is us-east-1. so region = az[0:-1] if _EC2_AZ_RE.match(data_source.availability_zone): - subst['ec2_region'] = "%s" % data_source.availability_zone[0:-1] + ec2_region = data_source.availability_zone[0:-1] + + if ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES: + subst['ec2_region'] = "%s" % ec2_region + elif data_source.platform_type == "ec2": + subst['ec2_region'] = "%s" % ec2_region if data_source and data_source.region: subst['region'] = data_source.region diff --git a/cloudinit/distros/tests/test_init.py b/cloudinit/distros/tests/test_init.py index 40939133..db534654 100644 --- a/cloudinit/distros/tests/test_init.py +++ b/cloudinit/distros/tests/test_init.py @@ -67,6 +67,9 @@ class TestGetPackageMirrorInfo: assert {'primary': 'http://other'} == _get_package_mirror_info( mirror_info, mirror_filter=lambda x: False) + @pytest.mark.parametrize('allow_ec2_mirror, platform_type', [ + (True, 'ec2') + ]) @pytest.mark.parametrize('availability_zone,region,patterns,expected', ( # Test ec2_region alone ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'], @@ -120,16 +123,34 @@ class TestGetPackageMirrorInfo: ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu']) for invalid_char in INVALID_URL_CHARS )) - def test_substitution(self, availability_zone, region, patterns, expected): + def test_valid_substitution(self, + allow_ec2_mirror, + platform_type, + availability_zone, + region, + patterns, + expected): """Test substitution works as expected.""" + flag_path = "cloudinit.distros." \ + "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + m_data_source = mock.Mock( - availability_zone=availability_zone, region=region + availability_zone=availability_zone, + region=region, + platform_type=platform_type ) mirror_info = {'search': {'primary': patterns}} - ret = _get_package_mirror_info( - mirror_info, - data_source=m_data_source, - mirror_filter=lambda x: x - ) + with mock.patch(flag_path, allow_ec2_mirror): + ret = _get_package_mirror_info( + mirror_info, + data_source=m_data_source, + mirror_filter=lambda x: x + ) + print(allow_ec2_mirror) + print(platform_type) + print(availability_zone) + print(region) + print(patterns) + print(expected) assert {'primary': expected} == ret diff --git a/cloudinit/features.py b/cloudinit/features.py index e455213d..c44fa29e 100644 --- a/cloudinit/features.py +++ b/cloudinit/features.py @@ -26,6 +26,17 @@ After the 20.2 release, we instead raise an exception. This flag can be removed after Focal is no longer supported """ + +ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES = False +""" +When configuring apt mirrors, old behavior is to allow +the use of ec2 mirrors if the datasource availability_zone format +matches one of the possible aws ec2 regions. After the 20.2 release, we +no longer publish ec2 region mirror urls on non-AWS cloud platforms. +Besides feature_overrides.py, users can override this by providing +#cloud-config apt directives. +""" + try: # pylint: disable=wildcard-import from cloudinit.feature_overrides import * # noqa diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index 6b6c1566..44607489 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -6,6 +6,7 @@ from cloudinit import util from cloudinit.tests import helpers import os +import pytest import shutil import tempfile from unittest import mock @@ -37,24 +38,6 @@ gapmi = distros._get_arch_package_mirror_info class TestGenericDistro(helpers.FilesystemMockingTestCase): - def return_first(self, mlist): - if not mlist: - return None - return mlist[0] - - def return_second(self, mlist): - if not mlist: - return None - return mlist[1] - - def return_none(self, _mlist): - return None - - def return_last(self, mlist): - if not mlist: - return None - return(mlist[-1]) - def setUp(self): super(TestGenericDistro, self).setUp() # Make a temp directoy for tests to use. @@ -145,61 +128,6 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): arch_mirrors = gapmi(package_mirrors, arch="amd64") self.assertEqual(package_mirrors[0], arch_mirrors) - def test_get_package_mirror_info_az_ec2(self): - arch_mirrors = gapmi(package_mirrors, arch="amd64") - data_source_mock = mock.Mock(availability_zone="us-east-1a") - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - self.assertEqual(results, - {'primary': 'http://us-east-1.ec2/', - 'security': 'http://security-mirror1-intel'}) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_second) - self.assertEqual(results, - {'primary': 'http://us-east-1a.clouds/', - 'security': 'http://security-mirror2-intel'}) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_none) - self.assertEqual(results, package_mirrors[0]['failsafe']) - - def test_get_package_mirror_info_az_non_ec2(self): - arch_mirrors = gapmi(package_mirrors, arch="amd64") - data_source_mock = mock.Mock(availability_zone="nova.cloudvendor") - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - self.assertEqual(results, - {'primary': 'http://nova.cloudvendor.clouds/', - 'security': 'http://security-mirror1-intel'}) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_last) - self.assertEqual(results, - {'primary': 'http://nova.cloudvendor.clouds/', - 'security': 'http://security-mirror2-intel'}) - - def test_get_package_mirror_info_none(self): - arch_mirrors = gapmi(package_mirrors, arch="amd64") - data_source_mock = mock.Mock(availability_zone=None) - - # because both search entries here replacement based on - # availability-zone, the filter will be called with an empty list and - # failsafe should be taken. - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_first) - self.assertEqual(results, - {'primary': 'http://fs-primary-intel', - 'security': 'http://security-mirror1-intel'}) - - results = gpmi(arch_mirrors, data_source=data_source_mock, - mirror_filter=self.return_last) - self.assertEqual(results, - {'primary': 'http://fs-primary-intel', - 'security': 'http://security-mirror2-intel'}) - def test_systemd_in_use(self): cls = distros.fetch("ubuntu") d = cls("ubuntu", {}, None) @@ -259,4 +187,116 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): ["pw", "usermod", "myuser", "-p", "01-Jan-1970"]) +class TestGetPackageMirrors: + + def return_first(self, mlist): + if not mlist: + return None + return mlist[0] + + def return_second(self, mlist): + if not mlist: + return None + + return mlist[1] if len(mlist) > 1 else None + + def return_none(self, _mlist): + return None + + def return_last(self, mlist): + if not mlist: + return None + return(mlist[-1]) + + @pytest.mark.parametrize( + "allow_ec2_mirror, platform_type, mirrors", + [ + (True, "ec2", [ + {'primary': 'http://us-east-1.ec2/', + 'security': 'http://security-mirror1-intel'}, + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror2-intel'} + ]), + (True, "other", [ + {'primary': 'http://us-east-1.ec2/', + 'security': 'http://security-mirror1-intel'}, + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror2-intel'} + ]), + (False, "ec2", [ + {'primary': 'http://us-east-1.ec2/', + 'security': 'http://security-mirror1-intel'}, + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror2-intel'} + ]), + (False, "other", [ + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror1-intel'}, + {'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror2-intel'} + ]) + ]) + def test_get_package_mirror_info_az_ec2(self, + allow_ec2_mirror, + platform_type, + mirrors): + flag_path = "cloudinit.distros." \ + "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES" + with mock.patch(flag_path, allow_ec2_mirror): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock( + availability_zone="us-east-1a", + platform_type=platform_type) + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_first) + assert(results == mirrors[0]) + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_second) + assert(results == mirrors[1]) + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_none) + assert(results == package_mirrors[0]['failsafe']) + + def test_get_package_mirror_info_az_non_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone="nova.cloudvendor") + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_first) + assert(results == { + 'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror1-intel'} + ) + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_last) + assert(results == { + 'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror2-intel'} + ) + + def test_get_package_mirror_info_none(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone=None) + + # because both search entries here replacement based on + # availability-zone, the filter will be called with an empty list and + # failsafe should be taken. + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_first) + assert(results == { + 'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror1-intel'} + ) + + results = gpmi(arch_mirrors, data_source=data_source_mock, + mirror_filter=self.return_last) + assert(results == { + 'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror2-intel'} + ) + # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py index e5382544..369480be 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py @@ -101,6 +101,7 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): cfg = {'apt_mirror_search': mirror} else: cfg = {'apt_mirror': mirror} + mycloud = self._get_cloud(distro) with mock.patch.object(util, 'write_file') as mockwf: @@ -108,8 +109,9 @@ class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): return_value="faketmpl") as mocklf: with mock.patch.object(os.path, 'isfile', return_value=True) as mockisfile: - with mock.patch.object(templater, 'render_string', - return_value="fake") as mockrnd: + with mock.patch.object( + templater, 'render_string', + return_value='fake') as mockrnd: with mock.patch.object(util, 'rename'): cc_apt_configure.handle("test", cfg, mycloud, LOG, None) diff --git a/tests/unittests/test_handler/test_handler_apt_source_v1.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py index f2349157..367971cb 100644 --- a/tests/unittests/test_handler/test_handler_apt_source_v1.py +++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py @@ -43,10 +43,17 @@ class FakeDistro(object): return +class FakeDatasource: + """Fake Datasource helper object""" + def __init__(self): + self.region = 'region' + + class FakeCloud(object): """Fake Cloud helper object""" def __init__(self): self.distro = FakeDistro() + self.datasource = FakeDatasource() class TestAptSourceConfig(TestCase): diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py index 220100e2..ac847238 100644 --- a/tests/unittests/test_handler/test_handler_apt_source_v3.py +++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py @@ -49,6 +49,18 @@ MOCK_LSB_RELEASE_DATA = { 'release': '18.04', 'codename': 'bionic'} +class FakeDatasource: + """Fake Datasource helper object""" + def __init__(self): + self.region = 'region' + + +class FakeCloud: + """Fake Cloud helper object""" + def __init__(self): + self.datasource = FakeDatasource() + + class TestAptSourceConfig(t_help.FilesystemMockingTestCase): """TestAptSourceConfig Main Class to test apt configs @@ -471,7 +483,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): fromfn = ("%s/%s_%s" % (pre, archive, post)) tofn = ("%s/test.ubuntu.com_%s" % (pre, post)) - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, FakeCloud(), arch) self.assertEqual(mirrors['MIRROR'], "http://test.ubuntu.com/%s/" % component) @@ -559,7 +571,8 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): "security": [{'arches': ["default"], "uri": smir}]} - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + mirrors = cc_apt_configure.find_apt_mirror_info( + cfg, FakeCloud(), 'amd64') self.assertEqual(mirrors['MIRROR'], pmir) @@ -594,7 +607,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): "security": [{'arches': ["default"], "uri": "nothis-security"}, {'arches': [arch], "uri": smir}]} - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, FakeCloud(), arch) self.assertEqual(mirrors['PRIMARY'], pmir) self.assertEqual(mirrors['MIRROR'], pmir) @@ -613,7 +626,8 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): {'arches': ["default"], "uri": smir}]} - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, 'amd64') + mirrors = cc_apt_configure.find_apt_mirror_info( + cfg, FakeCloud(), 'amd64') self.assertEqual(mirrors['MIRROR'], pmir) @@ -673,7 +687,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): with mock.patch.object(cc_apt_configure.util, 'search_for_mirror', side_effect=[pmir, smir]) as mocksearch: - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, + mirrors = cc_apt_configure.find_apt_mirror_info(cfg, FakeCloud(), 'amd64') calls = [call(["pfailme", pmir]), @@ -712,7 +726,8 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase): # should not be called, since primary is specified with mock.patch.object(cc_apt_configure.util, 'search_for_mirror') as mockse: - mirrors = cc_apt_configure.find_apt_mirror_info(cfg, None, arch) + mirrors = cc_apt_configure.find_apt_mirror_info( + cfg, FakeCloud(), arch) mockse.assert_not_called() self.assertEqual(mirrors['MIRROR'], -- cgit v1.2.3