summaryrefslogtreecommitdiff
path: root/cloudinit/distros/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/distros/__init__.py')
-rwxr-xr-xcloudinit/distros/__init__.py109
1 files changed, 107 insertions, 2 deletions
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index 92598a2d..86a22c3e 100755
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -13,6 +13,8 @@ import abc
import os
import re
import stat
+import string
+import urllib.parse
from io import StringIO
from cloudinit import importer
@@ -50,6 +52,9 @@ _EC2_AZ_RE = re.compile('^[a-z][a-z]-(?:[a-z]+-)+[0-9][a-z]$')
# Default NTP Client Configurations
PREFERRED_NTP_CLIENTS = ['chrony', 'systemd-timesyncd', 'ntp', 'ntpdate']
+# Letters/Digits/Hyphen characters, for use in domain name validation
+LDH_ASCII_CHARS = string.ascii_letters + string.digits + "-"
+
class Distro(metaclass=abc.ABCMeta):
@@ -720,6 +725,102 @@ class Distro(metaclass=abc.ABCMeta):
LOG.info("Added user '%s' to group '%s'", member, name)
+def _apply_hostname_transformations_to_url(url: str, transformations: list):
+ """
+ Apply transformations to a URL's hostname, return transformed URL.
+
+ This is a separate function because unwrapping and rewrapping only the
+ hostname portion of a URL is complex.
+
+ :param url:
+ The URL to operate on.
+ :param transformations:
+ A list of ``(str) -> Optional[str]`` functions, which will be applied
+ in order to the hostname portion of the URL. If any function
+ (regardless of ordering) returns None, ``url`` will be returned without
+ any modification.
+
+ :return:
+ A string whose value is ``url`` with the hostname ``transformations``
+ applied, or ``None`` if ``url`` is unparseable.
+ """
+ try:
+ parts = urllib.parse.urlsplit(url)
+ except ValueError:
+ # If we can't even parse the URL, we shouldn't use it for anything
+ return None
+ new_hostname = parts.hostname
+
+ for transformation in transformations:
+ new_hostname = transformation(new_hostname)
+ if new_hostname is None:
+ # If a transformation returns None, that indicates we should abort
+ # processing and return `url` unmodified
+ return url
+
+ new_netloc = new_hostname
+ if parts.port is not None:
+ new_netloc = "{}:{}".format(new_netloc, parts.port)
+ return urllib.parse.urlunsplit(parts._replace(netloc=new_netloc))
+
+
+def _sanitize_mirror_url(url: str):
+ """
+ Given a mirror URL, replace or remove any invalid URI characters.
+
+ This performs the following actions on the URL's hostname:
+ * Checks if it is an IP address, returning the URL immediately if it is
+ * Converts it to its IDN form (see below for details)
+ * Replaces any non-Letters/Digits/Hyphen (LDH) characters in it with
+ hyphens
+ * TODO: Remove any leading/trailing hyphens from each domain name label
+
+ Before we replace any invalid domain name characters, we first need to
+ ensure that any valid non-ASCII characters in the hostname will not be
+ replaced, by ensuring the hostname is in its Internationalized domain name
+ (IDN) representation (see RFC 5890). This conversion has to be applied to
+ the whole hostname (rather than just the substitution variables), because
+ the Punycode algorithm used by IDNA transcodes each part of the hostname as
+ a whole string (rather than encoding individual characters). It cannot be
+ applied to the whole URL, because (a) the Punycode algorithm expects to
+ operate on domain names so doesn't output a valid URL, and (b) non-ASCII
+ characters in non-hostname parts of the URL aren't encoded via Punycode.
+
+ To put this in RFC 5890's terminology: before we remove or replace any
+ characters from our domain name (which we do to ensure that each label is a
+ valid LDH Label), we first ensure each label is in its A-label form.
+
+ (Note that Python's builtin idna encoding is actually IDNA2003, not
+ IDNA2008. This changes the specifics of how some characters are encoded to
+ ASCII, but doesn't affect the logic here.)
+
+ :param url:
+ The URL to operate on.
+
+ :return:
+ A sanitized version of the URL, which will have been IDNA encoded if
+ necessary, or ``None`` if the generated string is not a parseable URL.
+ """
+ # Acceptable characters are LDH characters, plus "." to separate each label
+ acceptable_chars = LDH_ASCII_CHARS + "."
+ transformations = [
+ # This is an IP address, not a hostname, so no need to apply the
+ # transformations
+ lambda hostname: None if net.is_ip_address(hostname) else hostname,
+
+ # Encode with IDNA to get the correct characters (as `bytes`), then
+ # decode with ASCII so we return a `str`
+ lambda hostname: hostname.encode('idna').decode('ascii'),
+
+ # Replace any unacceptable characters with "-"
+ lambda hostname: ''.join(
+ c if c in acceptable_chars else "-" for c in hostname
+ ),
+ ]
+
+ return _apply_hostname_transformations_to_url(url, transformations)
+
+
def _get_package_mirror_info(mirror_info, data_source=None,
mirror_filter=util.search_for_mirror):
# given a arch specific 'mirror_info' entry (from package_mirrors)
@@ -748,9 +849,13 @@ def _get_package_mirror_info(mirror_info, data_source=None,
mirrors = []
for tmpl in searchlist:
try:
- mirrors.append(tmpl % subst)
+ mirror = tmpl % subst
except KeyError:
- pass
+ continue
+
+ mirror = _sanitize_mirror_url(mirror)
+ if mirror is not None:
+ mirrors.append(mirror)
found = mirror_filter(mirrors)
if found: