# This file is part of cloud-init. See LICENSE file for license information. """Setup image for testing.""" from functools import partial import os from cloudinit import util as c_util from tests.cloud_tests import LOG from tests.cloud_tests import stage, util def installed_package_version(image, package, ensure_installed=True): """Get installed version of package. @param image: cloud_tests.images instance to operate on @param package: name of package @param ensure_installed: raise error if not installed @return_value: cloud-init version string """ os_family = util.get_os_family(image.properties['os']) if os_family == 'debian': cmd = ['dpkg-query', '-W', "--showformat=${Version}", package] elif os_family == 'redhat': cmd = ['rpm', '-q', '--queryformat', "'%{VERSION}'", package] else: raise NotImplementedError msg = 'query version for package: {}'.format(package) (out, err, exit) = image.execute( cmd, description=msg, rcs=(0,) if ensure_installed else range(0, 256)) return out.strip() def install_deb(args, image): """Install deb into image. @param args: cmdline arguments, must contain --deb @param image: cloud_tests.images instance to operate on @return_value: None, may raise errors """ # ensure system is compatible with package format os_family = util.get_os_family(image.properties['os']) if os_family != 'debian': raise NotImplementedError('install deb: {} not supported on os ' 'family: {}'.format(args.deb, os_family)) # install deb msg = 'install deb: "{}" into target'.format(args.deb) LOG.debug(msg) remote_path = os.path.join('/tmp', os.path.basename(args.deb)) image.push_file(args.deb, remote_path) cmd = 'dpkg -i {}; apt-get install --yes -f'.format(remote_path) image.execute(cmd, description=msg) # check installed deb version matches package fmt = ['-W', "--showformat=${Version}"] (out, err, exit) = image.execute(['dpkg-deb'] + fmt + [remote_path]) expected_version = out.strip() found_version = installed_package_version(image, 'cloud-init') if expected_version != found_version: raise OSError('install deb version "{}" does not match expected "{}"' .format(found_version, expected_version)) LOG.debug('successfully installed: %s, version: %s', args.deb, found_version) def install_rpm(args, image): """Install rpm into image. @param args: cmdline arguments, must contain --rpm @param image: cloud_tests.images instance to operate on @return_value: None, may raise errors """ os_family = util.get_os_family(image.properties['os']) if os_family != 'redhat': raise NotImplementedError('install rpm: {} not supported on os ' 'family: {}'.format(args.rpm, os_family)) # install rpm msg = 'install rpm: "{}" into target'.format(args.rpm) LOG.debug(msg) remote_path = os.path.join('/tmp', os.path.basename(args.rpm)) image.push_file(args.rpm, remote_path) image.execute(['rpm', '-U', remote_path], description=msg) fmt = ['--queryformat', '"%{VERSION}"'] (out, err, exit) = image.execute(['rpm', '-q'] + fmt + [remote_path]) expected_version = out.strip() found_version = installed_package_version(image, 'cloud-init') if expected_version != found_version: raise OSError('install rpm version "{}" does not match expected "{}"' .format(found_version, expected_version)) LOG.debug('successfully installed: %s, version %s', args.rpm, found_version) def upgrade(args, image): """Upgrade or install cloud-init from repo. @param args: cmdline arguments @param image: cloud_tests.images instance to operate on @return_value: None, may raise errors """ os_family = util.get_os_family(image.properties['os']) if os_family == 'debian': cmd = 'apt-get update && apt-get install cloud-init --yes' elif os_family == 'redhat': cmd = 'sleep 10 && yum install cloud-init --assumeyes' else: raise NotImplementedError msg = 'upgrading cloud-init' LOG.debug(msg) image.execute(cmd, description=msg) def upgrade_full(args, image): """Run the system's full upgrade command. @param args: cmdline arguments @param image: cloud_tests.images instance to operate on @return_value: None, may raise errors """ os_family = util.get_os_family(image.properties['os']) if os_family == 'debian': cmd = 'apt-get update && apt-get upgrade --yes' elif os_family == 'redhat': cmd = 'yum upgrade --assumeyes' else: raise NotImplementedError('upgrade command not configured for distro ' 'from family: {}'.format(os_family)) msg = 'full system upgrade' LOG.debug(msg) image.execute(cmd, description=msg) def run_script(args, image): """Run a script in the target image. @param args: cmdline arguments, must contain --script @param image: cloud_tests.images instance to operate on @return_value: None, may raise errors """ msg = 'run setup image script in target image' LOG.debug(msg) image.run_script(args.script, description=msg) def enable_ppa(args, image): """Enable a ppa in the target image. @param args: cmdline arguments, must contain --ppa @param image: cloud_tests.image instance to operate on @return_value: None, may raise errors """ # ppa only supported on ubuntu (maybe debian?) if image.properties['os'].lower() != 'ubuntu': raise NotImplementedError('enabling a ppa is only available on ubuntu') # add ppa with add-apt-repository and update ppa = 'ppa:{}'.format(args.ppa) msg = 'enable ppa: "{}" in target'.format(ppa) LOG.debug(msg) cmd = 'add-apt-repository --yes {} && apt-get update'.format(ppa) image.execute(cmd, description=msg) def enable_repo(args, image): """Enable a repository in the target image. @param args: cmdline arguments, must contain --repo @param image: cloud_tests.image instance to operate on @return_value: None, may raise errors """ # find enable repo command for the distro os_family = util.get_os_family(image.properties['os']) if os_family == 'debian': cmd = ('echo "{}" >> "/etc/apt/sources.list" '.format(args.repo) + '&& apt-get update') elif os_family == 'centos': cmd = 'yum-config-manager --add-repo="{}"'.format(args.repo) else: raise NotImplementedError('enable repo command not configured for ' 'distro from family: {}'.format(os_family)) msg = 'enable repo: "{}" in target'.format(args.repo) LOG.debug(msg) image.execute(cmd, description=msg) def generate_ssh_keys(data_dir): """Generate SSH keys to be used with image.""" LOG.info('generating SSH keys') filename = os.path.join(data_dir, 'id_rsa') if os.path.exists(filename): c_util.del_file(filename) c_util.subp(['ssh-keygen', '-t', 'rsa', '-b', '4096', '-f', filename, '-P', '', '-C', 'ubuntu@cloud_test'], capture=True) def setup_image(args, image): """Set up image as specified in args. @param args: cmdline arguments @param image: cloud_tests.image instance to operate on @return_value: tuple of results and fail count """ # update the args if necessary for this image overrides = image.setup_overrides LOG.debug('updating args for setup with: %s', overrides) args = util.update_args(args, overrides, preserve_old=True) # mapping of setup cmdline arg name to setup function # represented as a tuple rather than a dict or odict as lookup by name not # needed, and order is important as --script and --upgrade go at the end handlers = ( # arg handler description ('deb', install_deb, 'setup func for --deb, install deb'), ('rpm', install_rpm, 'setup func for --rpm, install rpm'), ('repo', enable_repo, 'setup func for --repo, enable repo'), ('ppa', enable_ppa, 'setup func for --ppa, enable ppa'), ('script', run_script, 'setup func for --script, run script'), ('upgrade', upgrade, 'setup func for --upgrade, upgrade cloud-init'), ('upgrade-full', upgrade_full, 'setup func for --upgrade-full'), ) # determine which setup functions needed calls = [partial(stage.run_single, desc, partial(func, args, image)) for name, func, desc in handlers if getattr(args, name, None)] LOG.info('setting up %s', image) res = stage.run_stage( 'set up for {}'.format(image), calls, continue_after_error=False) LOG.debug('after setup complete, installed cloud-init version is: %s', installed_package_version(image, 'cloud-init')) generate_ssh_keys(args.data_dir) return res # vi: ts=4 expandtab