diff options
| -rw-r--r-- | cloudinit/config/cc_apt_configure.py | 135 | ||||
| -rw-r--r-- | cloudinit/gpg.py | 30 | ||||
| -rw-r--r-- | doc/examples/cloud-config-apt.txt | 24 | ||||
| -rw-r--r-- | tests/integration_tests/modules/test_apt.py | 62 | ||||
| -rw-r--r-- | tests/unittests/test_gpg.py | 81 | ||||
| -rw-r--r-- | tests/unittests/test_handler/test_handler_apt_key.py | 137 | ||||
| -rw-r--r-- | tests/unittests/test_handler/test_handler_apt_source_v1.py | 75 | ||||
| -rw-r--r-- | tests/unittests/test_handler/test_handler_apt_source_v3.py | 85 | 
8 files changed, 548 insertions, 81 deletions
| diff --git a/cloudinit/config/cc_apt_configure.py b/cloudinit/config/cc_apt_configure.py index 0c9c7925..c3c48bbd 100644 --- a/cloudinit/config/cc_apt_configure.py +++ b/cloudinit/config/cc_apt_configure.py @@ -11,6 +11,7 @@  import glob  import os  import re +import pathlib  from textwrap import dedent  from cloudinit.config.schema import ( @@ -27,6 +28,10 @@ LOG = logging.getLogger(__name__)  # this will match 'XXX:YYY' (ie, 'cloud-archive:foo' or 'ppa:bar')  ADD_APT_REPO_MATCH = r"^[\w-]+:\w" +APT_LOCAL_KEYS = '/etc/apt/trusted.gpg' +APT_TRUSTED_GPG_DIR = '/etc/apt/trusted.gpg.d/' +CLOUD_INIT_GPG_DIR = '/etc/apt/cloud-init.gpg.d/' +  frequency = PER_INSTANCE  distros = ["ubuntu", "debian"]  mirror_property = { @@ -139,7 +144,7 @@ schema = {                source1:                    keyid: 'keyid'                    keyserver: 'keyserverurl' -                  source: 'deb http://<url>/ xenial main' +                  source: 'deb [signed-by=$KEY_FILE] http://<url>/ xenial main'                source2:                    source: 'ppa:<ppa-name>'                source3: @@ -312,7 +317,8 @@ schema = {                              - ``$MIRROR``                              - ``$RELEASE``                              - ``$PRIMARY`` -                            - ``$SECURITY``""") +                            - ``$SECURITY`` +                            - ``$KEY_FILE``""")                  },                  'conf': {                      'type': 'string', @@ -381,7 +387,8 @@ schema = {                              - ``$MIRROR``                              - ``$PRIMARY``                              - ``$SECURITY`` -                            - ``$RELEASE``""") +                            - ``$RELEASE`` +                            - ``$KEY_FILE``""")                  }              }          } @@ -683,7 +690,7 @@ def add_mirror_keys(cfg, target):      """Adds any keys included in the primary/security mirror clauses"""      for key in ('primary', 'security'):          for mirror in cfg.get(key, []): -            add_apt_key(mirror, target) +            add_apt_key(mirror, target, file_name=key)  def generate_sources_list(cfg, release, mirrors, cloud): @@ -714,20 +721,21 @@ def generate_sources_list(cfg, release, mirrors, cloud):      util.write_file(aptsrc, disabled, mode=0o644) -def add_apt_key_raw(key, target=None): +def add_apt_key_raw(key, file_name, hardened=False, target=None):      """      actual adding of a key as defined in key argument      to the system      """      LOG.debug("Adding key:\n'%s'", key)      try: -        subp.subp(['apt-key', 'add', '-'], data=key.encode(), target=target) +        name = pathlib.Path(file_name).stem +        return apt_key('add', output_file=name, data=key, hardened=hardened)      except subp.ProcessExecutionError:          LOG.exception("failed to add apt GPG Key to apt keyring")          raise -def add_apt_key(ent, target=None): +def add_apt_key(ent, target=None, hardened=False, file_name=None):      """      Add key to the system as defined in ent (if any).      Supports raw keys or keyid's @@ -741,7 +749,10 @@ def add_apt_key(ent, target=None):          ent['key'] = gpg.getkeybyid(ent['keyid'], keyserver)      if 'key' in ent: -        add_apt_key_raw(ent['key'], target) +        return add_apt_key_raw( +            ent['key'], +            file_name or ent['filename'], +            hardened=hardened)  def update_packages(cloud): @@ -751,9 +762,28 @@ def update_packages(cloud):  def add_apt_sources(srcdict, cloud, target=None, template_params=None,                      aa_repo_match=None):      """ -    add entries in /etc/apt/sources.list.d for each abbreviated -    sources.list entry in 'srcdict'.  When rendering template, also -    include the values in dictionary searchList +    install keys and repo source .list files defined in 'sources' + +    for each 'source' entry in the config: +        1. expand template variables and write source .list file in +                /etc/apt/sources.list.d/ +        2. install defined keys +        3. update packages via distro-specific method (i.e. apt-key update) + + +    @param srcdict: a dict containing elements required +    @param cloud: cloud instance object + +    Example srcdict value: +    { +    'rio-grande-repo': { +        'source': 'deb [signed-by=$KEY_FILE] $MIRROR $RELEASE main', +        'keyid': 'B59D 5F15 97A5 04B7 E230  6DCA 0620 BBCF 0368 3F77', +        'keyserver': 'pgp.mit.edu' +        } +    } + +    Note: Deb822 format is not supported      """      if template_params is None:          template_params = {} @@ -770,7 +800,11 @@ def add_apt_sources(srcdict, cloud, target=None, template_params=None,          if 'filename' not in ent:              ent['filename'] = filename -        add_apt_key(ent, target) +        if 'source' in ent and '$KEY_FILE' in ent['source']: +            key_file = add_apt_key(ent, target, hardened=True) +            template_params['KEY_FILE'] = key_file +        else: +            key_file = add_apt_key(ent, target)          if 'source' not in ent:              continue @@ -1006,7 +1040,7 @@ def get_arch_mirrorconfig(cfg, mirrortype, arch):      # select the specification matching the target arch      default = None      for mirror_cfg_elem in mirror_cfg_list: -        arches = mirror_cfg_elem.get("arches") +        arches = mirror_cfg_elem.get("arches", [])          if arch in arches:              return mirror_cfg_elem          if "default" in arches: @@ -1089,6 +1123,81 @@ def apply_apt_config(cfg, proxy_fname, config_fname):          LOG.debug("no apt config configured, removed %s", config_fname) +def apt_key(command, output_file=None, data=None, hardened=False, +            human_output=True): +    """apt-key replacement + +    commands implemented: 'add', 'list', 'finger' + +    @param output_file: name of output gpg file (without .gpg or .asc) +    @param data: key contents +    @param human_output: list keys formatted for human parsing +    @param hardened: write keys to to /etc/apt/cloud-init.gpg.d/ (referred to +    with [signed-by] in sources file) +    """ + +    def _get_key_files(): +        """return all apt keys + +        /etc/apt/trusted.gpg (if it exists) and all keyfiles (and symlinks to +        keyfiles) in /etc/apt/trusted.gpg.d/ are returned + +        based on apt-key implementation +        """ +        key_files = [APT_LOCAL_KEYS] if os.path.isfile(APT_LOCAL_KEYS) else [] + +        for file in os.listdir(APT_TRUSTED_GPG_DIR): +            if file.endswith('.gpg') or file.endswith('.asc'): +                key_files.append(APT_TRUSTED_GPG_DIR + file) +        return key_files if key_files else '' + +    def apt_key_add(): +        """apt-key add <file> + +        returns filepath to new keyring, or '/dev/null' when an error occurs +        """ +        file_name = '/dev/null' +        if not output_file: +            util.logexc( +                LOG, 'Unknown filename, failed to add key: "{}"'.format(data)) +        else: +            try: +                key_dir = \ +                    CLOUD_INIT_GPG_DIR if hardened else APT_TRUSTED_GPG_DIR +                stdout = gpg.dearmor(data) +                file_name = '{}{}.gpg'.format(key_dir, output_file) +                util.write_file(file_name, stdout) +            except subp.ProcessExecutionError: +                util.logexc(LOG, 'Gpg error, failed to add key: {}'.format( +                    data)) +            except UnicodeDecodeError: +                util.logexc(LOG, 'Decode error, failed to add key: {}'.format( +                    data)) +        return file_name + +    def apt_key_list(): +        """apt-key list + +        returns string of all trusted keys (in /etc/apt/trusted.gpg and +        /etc/apt/trusted.gpg.d/) +        """ +        key_list = [] +        for key_file in _get_key_files(): +            try: +                key_list.append(gpg.list(key_file, human_output=human_output)) +            except subp.ProcessExecutionError as error: +                LOG.warning('Failed to list key "%s": %s', key_file, error) +        return '\n'.join(key_list) + +    if command == 'add': +        return apt_key_add() +    elif command == 'finger' or command == 'list': +        return apt_key_list() +    else: +        raise ValueError( +            'apt_key() commands add, list, and finger are currently supported') + +  CONFIG_CLEANERS = {      'cloud-init': clean_cloud_init,  } diff --git a/cloudinit/gpg.py b/cloudinit/gpg.py index 3780326c..07d682d2 100644 --- a/cloudinit/gpg.py +++ b/cloudinit/gpg.py @@ -14,6 +14,9 @@ import time  LOG = logging.getLogger(__name__) +GPG_LIST = ['gpg', '--with-fingerprint', '--no-default-keyring', '--list-keys', +            '--keyring'] +  def export_armour(key):      """Export gpg key, armoured key gets returned""" @@ -27,6 +30,33 @@ def export_armour(key):      return armour +def dearmor(key): +    """Dearmor gpg key, dearmored key gets returned + +        note: man gpg(1) makes no mention of an --armour spelling, only --armor +    """ +    return subp.subp(["gpg", "--dearmor"], data=key, decode=False)[0] + + +def list(key_file, human_output=False): +    """List keys from a keyring with fingerprints. Default to a stable machine +    parseable format. + +    @param key_file: a string containing a filepath to a key +    @param human_output: return output intended for human parsing +    """ +    cmd = [] +    cmd.extend(GPG_LIST) +    if not human_output: +        cmd.append('--with-colons') + +    cmd.append(key_file) +    (stdout, stderr) = subp.subp(cmd, capture=True) +    if stderr: +        LOG.warning('Failed to export armoured key "%s": %s', key_file, stderr) +    return stdout + +  def recv_key(key, keyserver, retries=(1, 1)):      """Receive gpg key from the specified keyserver. diff --git a/doc/examples/cloud-config-apt.txt b/doc/examples/cloud-config-apt.txt index f4392326..7baa141c 100644 --- a/doc/examples/cloud-config-apt.txt +++ b/doc/examples/cloud-config-apt.txt @@ -149,6 +149,7 @@ apt:    # security is optional, if not defined it is set to the same value as primary    security:      - uri: http://security.ubuntu.com/ubuntu +    - arches: [default]    # If search_dns is set for security the searched pattern is:    #   <distro>-security-mirror @@ -212,14 +213,14 @@ apt:    #    # The key of each source entry is the filename and will be prepended by    # /etc/apt/sources.list.d/ if it doesn't start with a '/'. -  # If it doesn't end with .list it will be appended so that apt picks up it's +  # If it doesn't end with .list it will be appended so that apt picks up its    # configuration.    #    # Whenever there is no content to be written into such a file, the key is    # not used as filename - yet it can still be used as index for merging    # configuration.    # -  # The values inside the entries consost of the following optional entries: +  # The values inside the entries consist of the following optional entries:    #   'source': a sources.list entry (some variable replacements apply)    #   'keyid': providing a key to import via shortid or fingerprint    #   'key': providing a raw PGP key @@ -276,13 +277,14 @@ apt:      my-repo2.list:        # 2.4 replacement variables        # -      # sources can use $MIRROR, $PRIMARY, $SECURITY and $RELEASE replacement -      # variables. +      # sources can use $MIRROR, $PRIMARY, $SECURITY, $RELEASE and $KEY_FILE +      # replacement variables.        # They will be replaced with the default or specified mirrors and the        # running release.        # The entry below would be possibly turned into:        #   source: deb http://archive.ubuntu.com/ubuntu xenial multiverse -      source: deb $MIRROR $RELEASE multiverse +      source: deb [signed-by=$KEY_FILE] $MIRROR $RELEASE multiverse +      keyid: F430BBA5      my-repo3.list:        # this would have the same end effect as 'ppa:curtin-dev/test-archive' @@ -310,9 +312,19 @@ apt:        keyid: B59D 5F15 97A5 04B7 E230  6DCA 0620 BBCF 0368 3F77        keyserver: pgp.mit.edu +    ignored5: +      # 2.8 signed-by +      # +      # One can specify [signed-by=$KEY_FILE] in the source definition, which +      # will make the key be installed in the directory /etc/cloud-init.gpg.d/ +      # and the $KEY_FILE replacement variable will be replaced with the path +      # to the specified key. If $KEY_FILE is used, but no key is specified, +      # apt update will (rightfully) fail due to an invalid value. +      source: deb [signed-by=$KEY_FILE] $MIRROR $RELEASE multiverse +      keyid: B59D 5F15 97A5 04B7 E230  6DCA 0620 BBCF 0368 3F77      my-repo4.list: -      # 2.8 raw key +      # 2.9 raw key        #        # The apt signing key can also be specified by providing a pgp public key        # block. Providing the PGP key this way is the most robust method for diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py index 54711fc0..2c388047 100644 --- a/tests/integration_tests/modules/test_apt.py +++ b/tests/integration_tests/modules/test_apt.py @@ -1,9 +1,11 @@  """Series of integration tests covering apt functionality."""  import re -from tests.integration_tests.clouds import ImageSpecification  import pytest +from cloudinit.config import cc_apt_configure +from cloudinit import gpg +from tests.integration_tests.clouds import ImageSpecification  from tests.integration_tests.instances import IntegrationInstance @@ -43,6 +45,13 @@ apt:        keyid: 441614D8        keyserver: keyserver.ubuntu.com        source: "ppa:simplestreams-dev/trunk" +    test_signed_by: +      keyid: A2EB2DEC0BD7519B7B38BE38376A290EC8068B11 +      keyserver: keyserver.ubuntu.com +      source: "deb [signed-by=$KEY_FILE] http://ppa.launchpad.net/juju/stable/ubuntu $RELEASE main" +    test_bad_key: +      key: "" +      source: "deb $MIRROR $RELEASE main"      test_key:        source: "deb http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu $RELEASE main"        key: | @@ -91,12 +100,27 @@ TEST_KEYSERVER_KEY = "7260 0DB1 5B8E 4C8B 1964  B868 038A CC97 C660 A937"  TEST_PPA_KEY = "3552 C902 B4DD F7BD 3842  1821 015D 28D7 4416 14D8"  TEST_KEY = "1FF0 D853 5EF7 E719 E5C8  1B9C 083D 06FB E4D3 04DF" +TEST_SIGNED_BY_KEY = "A2EB 2DEC 0BD7 519B 7B38  BE38 376A 290E C806 8B11"  @pytest.mark.ci  @pytest.mark.ubuntu  @pytest.mark.user_data(USER_DATA)  class TestApt: +    def get_keys(self, class_client: IntegrationInstance): +        """Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg +        in human readable format. Mimics the output of apt-key finger +        """ +        list_cmd = ' '.join(gpg.GPG_LIST) + ' ' +        keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS) +        print(keys) +        files = class_client.execute( +            'ls ' + cc_apt_configure.APT_TRUSTED_GPG_DIR) +        for file in files.split(): +            path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file +            keys += class_client.execute(list_cmd + path) or '' +        return keys +      def test_sources_list(self, class_client: IntegrationInstance):          """Integration test for the apt module's `sources_list` functionality. @@ -152,8 +176,33 @@ class TestApt:              'http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu'          ) in ppa_path_contents -        keys = class_client.execute('apt-key finger') -        assert TEST_PPA_KEY in keys +        assert TEST_PPA_KEY in self.get_keys(class_client) + +    def test_signed_by(self, class_client: IntegrationInstance): +        """Test the apt signed-by functionality. +        """ +        release = ImageSpecification.from_os_image().release +        source = ( +            "deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] " +            "http://ppa.launchpad.net/juju/stable/ubuntu" +            " {} main".format(release)) +        print(class_client.execute('cat /var/log/cloud-init.log')) +        path_contents = class_client.read_from_file( +            '/etc/apt/sources.list.d/test_signed_by.list') +        assert path_contents == source + +        key = class_client.execute( +            'gpg --no-default-keyring --with-fingerprint --list-keys ' +            '--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg') + +        assert TEST_SIGNED_BY_KEY in key + +    def test_bad_key(self, class_client: IntegrationInstance): +        """Test the apt signed-by functionality. +        """ +        with pytest.raises(OSError): +            class_client.read_from_file( +                '/etc/apt/trusted.list.d/test_bad_key.gpg')      def test_key(self, class_client: IntegrationInstance):          """Test the apt key functionality. @@ -168,9 +217,7 @@ class TestApt:          assert (              'http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu'          ) in test_archive_contents - -        keys = class_client.execute('apt-key finger') -        assert TEST_KEY in keys +        assert TEST_KEY in self.get_keys(class_client)      def test_keyserver(self, class_client: IntegrationInstance):          """Test the apt keyserver functionality. @@ -186,8 +233,7 @@ class TestApt:              'http://ppa.launchpad.net/cloud-init-raharper/curtin-dev/ubuntu'          ) in test_keyserver_contents -        keys = class_client.execute('apt-key finger') -        assert TEST_KEYSERVER_KEY in keys +        assert TEST_KEYSERVER_KEY in self.get_keys(class_client)      def test_os_pipelining(self, class_client: IntegrationInstance):          """Test 'os' settings does not write apt config file. diff --git a/tests/unittests/test_gpg.py b/tests/unittests/test_gpg.py new file mode 100644 index 00000000..451ffa91 --- /dev/null +++ b/tests/unittests/test_gpg.py @@ -0,0 +1,81 @@ +import pytest +from unittest import mock + +from cloudinit import gpg +from cloudinit import subp + +TEST_KEY_HUMAN = ''' +/etc/apt/cloud-init.gpg.d/my_key.gpg +-------------------------------------------- +pub   rsa4096 2021-10-22 [SC] +      3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85 +uid           [ unknown] Brett Holman <brett.holman@canonical.com> +sub   rsa4096 2021-10-22 [A] +sub   rsa4096 2021-10-22 [E] +''' + +TEST_KEY_MACHINE = ''' +tru::1:1635129362:0:3:1:5 +pub:-:4096:1:F83F77129A5EBD85:1634912922:::-:::scESCA::::::23::0: +fpr:::::::::3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85: +uid:-::::1634912922::64F1F1D6FA96316752D635D7C6406C52C40713C7::Brett Holman \ +<brett.holman@canonical.com>::::::::::0: +sub:-:4096:1:544B39C9A9141F04:1634912922::::::a::::::23: +fpr:::::::::8BD901490D6EC986D03D6F0D544B39C9A9141F04: +sub:-:4096:1:F45D9443F0A87092:1634912922::::::e::::::23: +fpr:::::::::8CCCB332317324F030A45B19F45D9443F0A87092: +''' + +TEST_KEY_FINGERPRINT_HUMAN = \ +    '3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85' + +TEST_KEY_FINGERPRINT_MACHINE = \ +    '3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85' + + +class TestGPGCommands: +    def test_dearmor_bad_value(self): +        """This exception is handled by the callee. Ensure it is not caught +        internally. +        """ +        with mock.patch.object( +                subp, +                'subp', +                side_effect=subp.ProcessExecutionError): +            with pytest.raises(subp.ProcessExecutionError): +                gpg.dearmor('garbage key value') + +    def test_gpg_list_args(self): +        """Verify correct command gets called to list keys +        """ +        no_colons = [ +            'gpg', +            '--with-fingerprint', +            '--no-default-keyring', +            '--list-keys', +            '--keyring', +            'key'] +        colons = [ +            'gpg', +            '--with-fingerprint', +            '--no-default-keyring', +            '--list-keys', +            '--keyring', +            '--with-colons', +            'key'] +        with mock.patch.object(subp, 'subp', return_value=('', '')) as m_subp: +            gpg.list('key') +            assert mock.call(colons, capture=True) == m_subp.call_args + +            gpg.list('key', human_output=True) +            test_calls = mock.call((no_colons), capture=True) +            assert test_calls == m_subp.call_args + +    def test_gpg_dearmor_args(self): +        """Verify correct command gets called to dearmor keys +        """ +        with mock.patch.object(subp, 'subp', return_value=('', '')) as m_subp: +            gpg.dearmor('key') +            test_call = mock.call( +                ["gpg", "--dearmor"], data='key', decode=False) +            assert test_call == m_subp.call_args diff --git a/tests/unittests/test_handler/test_handler_apt_key.py b/tests/unittests/test_handler/test_handler_apt_key.py new file mode 100644 index 00000000..00e5a38d --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_key.py @@ -0,0 +1,137 @@ +import os +from unittest import mock + +from cloudinit.config import cc_apt_configure +from cloudinit import subp +from cloudinit import util + +TEST_KEY_HUMAN = ''' +/etc/apt/cloud-init.gpg.d/my_key.gpg +-------------------------------------------- +pub   rsa4096 2021-10-22 [SC] +      3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85 +uid           [ unknown] Brett Holman <brett.holman@canonical.com> +sub   rsa4096 2021-10-22 [A] +sub   rsa4096 2021-10-22 [E] +''' + +TEST_KEY_MACHINE = ''' +tru::1:1635129362:0:3:1:5 +pub:-:4096:1:F83F77129A5EBD85:1634912922:::-:::scESCA::::::23::0: +fpr:::::::::3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85: +uid:-::::1634912922::64F1F1D6FA96316752D635D7C6406C52C40713C7::Brett Holman \ +<brett.holman@canonical.com>::::::::::0: +sub:-:4096:1:544B39C9A9141F04:1634912922::::::a::::::23: +fpr:::::::::8BD901490D6EC986D03D6F0D544B39C9A9141F04: +sub:-:4096:1:F45D9443F0A87092:1634912922::::::e::::::23: +fpr:::::::::8CCCB332317324F030A45B19F45D9443F0A87092: +''' + +TEST_KEY_FINGERPRINT_HUMAN = \ +    '3A3E F34D FDED B3B7 F3FD  F603 F83F 7712 9A5E BD85' + +TEST_KEY_FINGERPRINT_MACHINE = \ +    '3A3EF34DFDEDB3B7F3FDF603F83F77129A5EBD85' + + +class TestAptKey: +    """TestAptKey +    Class to test apt-key commands +    """ +    @mock.patch.object(subp, 'subp', return_value=('fakekey', '')) +    @mock.patch.object(util, 'write_file') +    def _apt_key_add_success_helper(self, directory, *args, hardened=False): +        file = cc_apt_configure.apt_key( +            'add', +            output_file='my-key', +            data='fakekey', +            hardened=hardened) +        assert file == directory + '/my-key.gpg' + +    def test_apt_key_add_success(self): +        """Verify the correct directory path gets returned for unhardened case +        """ +        self._apt_key_add_success_helper('/etc/apt/trusted.gpg.d') + +    def test_apt_key_add_success_hardened(self): +        """Verify the correct directory path gets returned for hardened case +        """ +        self._apt_key_add_success_helper( +            '/etc/apt/cloud-init.gpg.d', +            hardened=True) + +    def test_apt_key_add_fail_no_file_name(self): +        """Verify that null filename gets handled correctly +        """ +        file = cc_apt_configure.apt_key( +            'add', +            output_file=None, +            data='') +        assert '/dev/null' == file + +    def _apt_key_fail_helper(self): +        file = cc_apt_configure.apt_key( +            'add', +            output_file='my-key', +            data='fakekey') +        assert file == '/dev/null' + +    @mock.patch.object(subp, 'subp', side_effect=subp.ProcessExecutionError) +    def test_apt_key_add_fail_no_file_name_subproc(self, *args): +        """Verify that bad key value gets handled correctly +        """ +        self._apt_key_fail_helper() + +    @mock.patch.object( +        subp, 'subp', side_effect=UnicodeDecodeError('test', b'', 1, 1, '')) +    def test_apt_key_add_fail_no_file_name_unicode(self, *args): +        """Verify that bad key encoding gets handled correctly +        """ +        self._apt_key_fail_helper() + +    def _apt_key_list_success_helper(self, finger, key, human_output=True): +        @mock.patch.object(os, 'listdir', return_value=('/fake/dir/key.gpg',)) +        @mock.patch.object(subp, 'subp', return_value=(key, '')) +        def mocked_list(*a): + +            keys = cc_apt_configure.apt_key('list', human_output) +            assert finger in keys +        mocked_list() + +    def test_apt_key_list_success_human(self): +        """Verify expected key output, human +        """ +        self._apt_key_list_success_helper( +            TEST_KEY_FINGERPRINT_HUMAN, +            TEST_KEY_HUMAN) + +    def test_apt_key_list_success_machine(self): +        """Verify expected key output, machine +        """ +        self._apt_key_list_success_helper( +            TEST_KEY_FINGERPRINT_MACHINE, +            TEST_KEY_MACHINE, human_output=False) + +    @mock.patch.object(os, 'listdir', return_value=()) +    @mock.patch.object(subp, 'subp', return_value=('', '')) +    def test_apt_key_list_fail_no_keys(self, *args): +        """Ensure falsy output for no keys +        """ +        keys = cc_apt_configure.apt_key('list') +        assert not keys + +    @mock.patch.object(os, 'listdir', return_value=('file_not_gpg_key.txt')) +    @mock.patch.object(subp, 'subp', return_value=('', '')) +    def test_apt_key_list_fail_no_keys_file(self, *args): +        """Ensure non-gpg file is not returned. + +        apt-key used file extensions for this, so we do too +        """ +        assert not cc_apt_configure.apt_key('list') + +    @mock.patch.object(subp, 'subp', side_effect=subp.ProcessExecutionError) +    @mock.patch.object(os, 'listdir', return_value=('bad_gpg_key.gpg')) +    def test_apt_key_list_fail_bad_key_file(self, *args): +        """Ensure bad gpg key doesn't throw exeption. +        """ +        assert not cc_apt_configure.apt_key('list') diff --git a/tests/unittests/test_handler/test_handler_apt_source_v1.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py index 367971cb..2357d699 100644 --- a/tests/unittests/test_handler/test_handler_apt_source_v1.py +++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py @@ -9,6 +9,7 @@ import os  import re  import shutil  import tempfile +import pathlib  from unittest import mock  from unittest.mock import call @@ -279,16 +280,16 @@ class TestAptSourceConfig(TestCase):          """          cfg = self.wrapv1conf(cfg) -        with mock.patch.object(subp, 'subp', -                               return_value=('fakekey 1234', '')) as mockobj: +        with mock.patch.object(cc_apt_configure, 'add_apt_key') as mockobj:              cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) -        # check if it added the right ammount of keys +        # check if it added the right number of keys          calls = [] -        for _ in range(keynum): -            calls.append(call(['apt-key', 'add', '-'], -                              data=b'fakekey 1234', -                              target=None)) +        sources = cfg['apt']['sources'] +        for src in sources: +            print(sources[src]) +            calls.append(call(sources[src], None)) +          mockobj.assert_has_calls(calls, any_order=True)          self.assertTrue(os.path.isfile(filename)) @@ -364,11 +365,17 @@ class TestAptSourceConfig(TestCase):          """          cfg = self.wrapv1conf([cfg]) -        with mock.patch.object(subp, 'subp') as mockobj: +        with mock.patch.object(cc_apt_configure, 'add_apt_key') as mockobj:              cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) -        mockobj.assert_called_with(['apt-key', 'add', '-'], -                                   data=b'fakekey 4321', target=None) +        # check if it added the right amount of keys +        sources = cfg['apt']['sources'] +        calls = [] +        for src in sources: +            print(sources[src]) +            calls.append(call(sources[src], None)) + +        mockobj.assert_has_calls(calls, any_order=True)          self.assertTrue(os.path.isfile(filename)) @@ -405,12 +412,15 @@ class TestAptSourceConfig(TestCase):          cfg = {'key': "fakekey 4242",                 'filename': self.aptlistfile}          cfg = self.wrapv1conf([cfg]) - -        with mock.patch.object(subp, 'subp') as mockobj: +        with mock.patch.object(cc_apt_configure, 'apt_key') as mockobj:              cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) -        mockobj.assert_called_once_with(['apt-key', 'add', '-'], -                                        data=b'fakekey 4242', target=None) +        calls = (call( +            'add', +            output_file=pathlib.Path(self.aptlistfile).stem, +            data='fakekey 4242', +            hardened=False),) +        mockobj.assert_has_calls(calls, any_order=True)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -422,16 +432,26 @@ class TestAptSourceConfig(TestCase):          cfg = self.wrapv1conf([cfg])          with mock.patch.object(subp, 'subp', -                               return_value=('fakekey 1212', '')) as mockobj: -            cc_apt_configure.handle("test", cfg, self.fakecloud, None, None) - -        mockobj.assert_called_with(['apt-key', 'add', '-'], -                                   data=b'fakekey 1212', target=None) +                               return_value=('fakekey 1212', '')): +            with mock.patch.object(cc_apt_configure, 'apt_key') as mockobj: +                cc_apt_configure.handle( +                    "test", +                    cfg, +                    self.fakecloud, +                    None, +                    None) + +        calls = (call( +            'add', +            output_file=pathlib.Path(self.aptlistfile).stem, +            data='fakekey 1212', +            hardened=False),) +        mockobj.assert_has_calls(calls, any_order=True)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) -    def apt_src_keyid_real(self, cfg, expectedkey): +    def apt_src_keyid_real(self, cfg, expectedkey, is_hardened=None):          """apt_src_keyid_real          Test specification of a keyid without source including          up to addition of the key (add_apt_key_raw mocked to keep the @@ -446,9 +466,14 @@ class TestAptSourceConfig(TestCase):                                     return_value=expectedkey) as mockgetkey:                  cc_apt_configure.handle("test", cfg, self.fakecloud,                                          None, None) - +        if is_hardened is not None: +            mockkey.assert_called_with( +                expectedkey, +                self.aptlistfile, +                hardened=is_hardened) +        else: +            mockkey.assert_called_with(expectedkey, self.aptlistfile)          mockgetkey.assert_called_with(key, keyserver) -        mockkey.assert_called_with(expectedkey, None)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -459,7 +484,7 @@ class TestAptSourceConfig(TestCase):          cfg = {'keyid': keyid,                 'filename': self.aptlistfile} -        self.apt_src_keyid_real(cfg, EXPECTEDKEY) +        self.apt_src_keyid_real(cfg, EXPECTEDKEY, is_hardened=False)      def test_apt_src_longkeyid_real(self):          """test_apt_src_longkeyid_real - Test long keyid including key add""" @@ -467,7 +492,7 @@ class TestAptSourceConfig(TestCase):          cfg = {'keyid': keyid,                 'filename': self.aptlistfile} -        self.apt_src_keyid_real(cfg, EXPECTEDKEY) +        self.apt_src_keyid_real(cfg, EXPECTEDKEY, is_hardened=False)      def test_apt_src_longkeyid_ks_real(self):          """test_apt_src_longkeyid_ks_real - Test long keyid from other ks""" @@ -476,7 +501,7 @@ class TestAptSourceConfig(TestCase):                 'keyserver': 'keys.gnupg.net',                 'filename': self.aptlistfile} -        self.apt_src_keyid_real(cfg, EXPECTEDKEY) +        self.apt_src_keyid_real(cfg, EXPECTEDKEY, is_hardened=False)      def test_apt_src_ppa(self):          """Test adding a ppa""" diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py index d4db610f..20289121 100644 --- a/tests/unittests/test_handler/test_handler_apt_source_v3.py +++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py @@ -10,6 +10,7 @@ import re  import shutil  import socket  import tempfile +import pathlib  from unittest import TestCase, mock  from unittest.mock import call @@ -214,22 +215,24 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):                 self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}}          self._apt_src_replace_tri(cfg) -    def _apt_src_keyid(self, filename, cfg, keynum): +    def _apt_src_keyid(self, filename, cfg, keynum, is_hardened=None):          """_apt_src_keyid          Test specification of a source + keyid          """          params = self._get_default_params() -        with mock.patch("cloudinit.subp.subp", -                        return_value=('fakekey 1234', '')) as mockobj: +        with mock.patch.object(cc_apt_configure, 'add_apt_key') as mockobj:              self._add_apt_sources(cfg, TARGET, template_params=params,                                    aa_repo_match=self.matcher) -        # check if it added the right ammount of keys +        # check if it added the right number of keys          calls = [] -        for _ in range(keynum): -            calls.append(call(['apt-key', 'add', '-'], data=b'fakekey 1234', -                              target=TARGET)) +        for key in cfg: +            if is_hardened is not None: +                calls.append(call(cfg[key], hardened=is_hardened)) +            else: +                calls.append(call(cfg[key], TARGET)) +          mockobj.assert_has_calls(calls, any_order=True)          self.assertTrue(os.path.isfile(filename)) @@ -248,6 +251,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):                                               'http://ppa.launchpad.net/'                                               'smoser/cloud-init-test/ubuntu'                                               ' xenial main'), +                                  'filename': self.aptlistfile,                                    'keyid': "03683F77"}}          self._apt_src_keyid(self.aptlistfile, cfg, 1) @@ -268,6 +272,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):                                                'http://ppa.launchpad.net/'                                                'smoser/cloud-init-test/ubuntu'                                                ' xenial multiverse'), +                                   'filename': self.aptlistfile3,                                     'keyid': "03683F77"}}          self._apt_src_keyid(self.aptlistfile, cfg, 3) @@ -293,15 +298,19 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):                                               'http://ppa.launchpad.net/'                                               'smoser/cloud-init-test/ubuntu'                                               ' xenial main'), +                                  'filename': self.aptlistfile,                                    'key': "fakekey 4321"}} -        with mock.patch.object(subp, 'subp') as mockobj: +        with mock.patch.object(cc_apt_configure, 'apt_key') as mockobj:              self._add_apt_sources(cfg, TARGET, template_params=params,                                    aa_repo_match=self.matcher) -        mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4321', -                                target=TARGET) - +        calls = (call( +            'add', +            output_file=pathlib.Path(self.aptlistfile).stem, +            data='fakekey 4321', +            hardened=False),) +        mockobj.assert_has_calls(calls, any_order=True)          self.assertTrue(os.path.isfile(self.aptlistfile))          contents = util.load_file(self.aptlistfile) @@ -317,12 +326,16 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):          params = self._get_default_params()          cfg = {self.aptlistfile: {'key': "fakekey 4242"}} -        with mock.patch.object(subp, 'subp') as mockobj: +        with mock.patch.object(cc_apt_configure, 'apt_key') as mockobj:              self._add_apt_sources(cfg, TARGET, template_params=params,                                    aa_repo_match=self.matcher) -        mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 4242', -                                target=TARGET) +        calls = (call( +            'add', +            output_file=pathlib.Path(self.aptlistfile).stem, +            data='fakekey 4242', +            hardened=False),) +        mockobj.assert_has_calls(calls, any_order=True)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -331,19 +344,23 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):          """test_apt_v3_src_keyidonly - Test keyid without source"""          params = self._get_default_params()          cfg = {self.aptlistfile: {'keyid': "03683F77"}} -          with mock.patch.object(subp, 'subp', -                               return_value=('fakekey 1212', '')) as mockobj: -            self._add_apt_sources(cfg, TARGET, template_params=params, -                                  aa_repo_match=self.matcher) +                               return_value=('fakekey 1212', '')): +            with mock.patch.object(cc_apt_configure, 'apt_key') as mockobj: +                self._add_apt_sources(cfg, TARGET, template_params=params, +                                      aa_repo_match=self.matcher) -        mockobj.assert_any_call(['apt-key', 'add', '-'], data=b'fakekey 1212', -                                target=TARGET) +        calls = (call( +            'add', +            output_file=pathlib.Path(self.aptlistfile).stem, +            data='fakekey 1212', +            hardened=False),) +        mockobj.assert_has_calls(calls, any_order=True)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) -    def apt_src_keyid_real(self, cfg, expectedkey): +    def apt_src_keyid_real(self, cfg, expectedkey, is_hardened=None):          """apt_src_keyid_real          Test specification of a keyid without source including          up to addition of the key (add_apt_key_raw mocked to keep the @@ -361,7 +378,11 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):          mockgetkey.assert_called_with(keycfg['keyid'],                                        keycfg.get('keyserver',                                                   'keyserver.ubuntu.com')) -        mockkey.assert_called_with(expectedkey, TARGET) +        if is_hardened is not None: +            mockkey.assert_called_with( +                expectedkey, +                keycfg['keyfile'], +                hardened=is_hardened)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -369,21 +390,24 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):      def test_apt_v3_src_keyid_real(self):          """test_apt_v3_src_keyid_real - Test keyid including key add"""          keyid = "03683F77" -        cfg = {self.aptlistfile: {'keyid': keyid}} +        cfg = {self.aptlistfile: {'keyid': keyid, +                                  'keyfile': self.aptlistfile}} -        self.apt_src_keyid_real(cfg, EXPECTEDKEY) +        self.apt_src_keyid_real(cfg, EXPECTEDKEY, is_hardened=False)      def test_apt_v3_src_longkeyid_real(self):          """test_apt_v3_src_longkeyid_real Test long keyid including key add"""          keyid = "B59D 5F15 97A5 04B7 E230  6DCA 0620 BBCF 0368 3F77" -        cfg = {self.aptlistfile: {'keyid': keyid}} +        cfg = {self.aptlistfile: {'keyid': keyid, +                                  'keyfile': self.aptlistfile}} -        self.apt_src_keyid_real(cfg, EXPECTEDKEY) +        self.apt_src_keyid_real(cfg, EXPECTEDKEY, is_hardened=False)      def test_apt_v3_src_longkeyid_ks_real(self):          """test_apt_v3_src_longkeyid_ks_real Test long keyid from other ks"""          keyid = "B59D 5F15 97A5 04B7 E230  6DCA 0620 BBCF 0368 3F77"          cfg = {self.aptlistfile: {'keyid': keyid, +                                  'keyfile': self.aptlistfile,                                    'keyserver': 'keys.gnupg.net'}}          self.apt_src_keyid_real(cfg, EXPECTEDKEY) @@ -393,6 +417,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):          keyid = "03683F77"          params = self._get_default_params()          cfg = {self.aptlistfile: {'keyid': keyid, +                                  'keyfile': self.aptlistfile,                                    'keyserver': 'test.random.com'}}          # in some test environments only *.ubuntu.com is reachable @@ -405,7 +430,7 @@ class TestAptSourceConfig(t_help.FilesystemMockingTestCase):                                        aa_repo_match=self.matcher)          mockgetkey.assert_called_with('03683F77', 'test.random.com') -        mockadd.assert_called_with('fakekey', TARGET) +        mockadd.assert_called_with('fakekey', self.aptlistfile, hardened=False)          # filename should be ignored on key only          self.assertFalse(os.path.isfile(self.aptlistfile)) @@ -1002,10 +1027,12 @@ deb http://ubuntu.com/ubuntu/ xenial-proposed main""")              'primary': [                  {'arches': [arch],                   'uri': 'http://test.ubuntu.com/', +                 'filename': 'primary',                   'key': 'fakekey_primary'}],              'security': [                  {'arches': [arch],                   'uri': 'http://testsec.ubuntu.com/', +                 'filename': 'security',                   'key': 'fakekey_security'}]          } @@ -1013,8 +1040,8 @@ deb http://ubuntu.com/ubuntu/ xenial-proposed main""")                                 'add_apt_key_raw') as mockadd:              cc_apt_configure.add_mirror_keys(cfg, TARGET)          calls = [ -            mock.call('fakekey_primary', TARGET), -            mock.call('fakekey_security', TARGET), +            mock.call('fakekey_primary', 'primary', hardened=False), +            mock.call('fakekey_security', 'security', hardened=False),          ]          mockadd.assert_has_calls(calls, any_order=True) | 
