summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-02-02 11:11:36 -0700
committerChad Smith <chad.smith@canonical.com>2018-02-02 11:11:36 -0700
commit78013bc65030421699b5feb66bc8b7a205abfbc0 (patch)
tree2ebf7111129f4aaf8a833ba6d226d4513ed59388 /tests
parent192261fe38a32edbd1f605ba25bbb6f4822a0720 (diff)
parentf7deaf15acf382d62554e2b1d70daa9a9109d542 (diff)
downloadvyos-cloud-init-78013bc65030421699b5feb66bc8b7a205abfbc0.tar.gz
vyos-cloud-init-78013bc65030421699b5feb66bc8b7a205abfbc0.zip
merge from master at 17.2-30-gf7deaf15
Diffstat (limited to 'tests')
-rw-r--r--tests/cloud_tests/__init__.py6
-rw-r--r--tests/cloud_tests/bddeb.py8
-rw-r--r--tests/cloud_tests/collect.py44
-rw-r--r--tests/cloud_tests/config.py4
-rw-r--r--tests/cloud_tests/images/__init__.py10
-rw-r--r--tests/cloud_tests/instances/__init__.py10
-rw-r--r--tests/cloud_tests/platforms.yaml11
-rw-r--r--tests/cloud_tests/platforms/__init__.py22
-rw-r--r--tests/cloud_tests/platforms/base.py27
-rw-r--r--tests/cloud_tests/platforms/ec2/image.py99
-rw-r--r--tests/cloud_tests/platforms/ec2/instance.py132
-rw-r--r--tests/cloud_tests/platforms/ec2/platform.py258
-rw-r--r--tests/cloud_tests/platforms/ec2/snapshot.py66
-rw-r--r--tests/cloud_tests/platforms/images.py (renamed from tests/cloud_tests/images/base.py)3
-rw-r--r--tests/cloud_tests/platforms/instances.py (renamed from tests/cloud_tests/instances/base.py)70
-rw-r--r--tests/cloud_tests/platforms/lxd/image.py (renamed from tests/cloud_tests/images/lxd.py)11
-rw-r--r--tests/cloud_tests/platforms/lxd/instance.py (renamed from tests/cloud_tests/instances/lxd.py)49
-rw-r--r--tests/cloud_tests/platforms/lxd/platform.py (renamed from tests/cloud_tests/platforms/lxd.py)14
-rw-r--r--tests/cloud_tests/platforms/lxd/snapshot.py (renamed from tests/cloud_tests/snapshots/lxd.py)4
-rw-r--r--tests/cloud_tests/platforms/nocloudkvm/image.py (renamed from tests/cloud_tests/images/nocloudkvm.py)21
-rw-r--r--tests/cloud_tests/platforms/nocloudkvm/instance.py (renamed from tests/cloud_tests/instances/nocloudkvm.py)131
-rw-r--r--tests/cloud_tests/platforms/nocloudkvm/platform.py (renamed from tests/cloud_tests/platforms/nocloudkvm.py)20
-rw-r--r--tests/cloud_tests/platforms/nocloudkvm/snapshot.py (renamed from tests/cloud_tests/snapshots/nocloudkvm.py)24
-rw-r--r--tests/cloud_tests/platforms/platforms.py96
-rw-r--r--tests/cloud_tests/platforms/snapshots.py (renamed from tests/cloud_tests/snapshots/base.py)0
-rw-r--r--tests/cloud_tests/releases.yaml32
-rw-r--r--tests/cloud_tests/setup_image.py18
-rw-r--r--tests/cloud_tests/snapshots/__init__.py10
-rw-r--r--tests/cloud_tests/testcases.yaml27
-rw-r--r--tests/cloud_tests/testcases/base.py9
-rw-r--r--tests/cloud_tests/testcases/modules/apt_configure_sources_list.py5
-rw-r--r--tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml6
-rw-r--r--tests/cloud_tests/testcases/modules/ntp_pools.yaml2
-rw-r--r--tests/cloud_tests/testcases/modules/ntp_servers.yaml2
-rw-r--r--tests/cloud_tests/testcases/modules/set_hostname_fqdn.py2
-rw-r--r--tests/cloud_tests/util.py19
-rw-r--r--tests/cloud_tests/verify.py2
-rw-r--r--tests/unittests/test_cli.py105
-rw-r--r--tests/unittests/test_cs_util.py1
-rw-r--r--tests/unittests/test_datasource/test_aliyun.py18
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py22
-rw-r--r--tests/unittests/test_datasource/test_azure.py244
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py13
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py19
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py62
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py20
-rw-r--r--tests/unittests/test_datasource/test_ec2.py8
-rw-r--r--tests/unittests/test_datasource/test_gce.py196
-rw-r--r--tests/unittests/test_datasource/test_maas.py53
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py14
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py235
-rw-r--r--tests/unittests/test_datasource/test_openstack.py12
-rw-r--r--tests/unittests/test_datasource/test_ovf.py111
-rw-r--r--tests/unittests/test_datasource/test_scaleway.py13
-rw-r--r--tests/unittests/test_datasource/test_smartos.py3
-rw-r--r--tests/unittests/test_distros/test_create_users.py7
-rw-r--r--tests/unittests/test_distros/test_netconfig.py52
-rw-r--r--tests/unittests/test_ds_identify.py133
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py3
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py3
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py22
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py10
-rw-r--r--tests/unittests/test_handler/test_handler_zypper_add_repo.py7
-rw-r--r--tests/unittests/test_net.py18
-rw-r--r--tests/unittests/test_reporting.py2
-rw-r--r--tests/unittests/test_runs/test_merge_run.py1
-rw-r--r--tests/unittests/test_runs/test_simple_run.py3
-rw-r--r--tests/unittests/test_templating.py2
-rw-r--r--tests/unittests/test_util.py67
-rw-r--r--tests/unittests/test_vmware/__init__.py0
-rw-r--r--tests/unittests/test_vmware/test_custom_script.py99
-rw-r--r--tests/unittests/test_vmware_config_file.py10
72 files changed, 2313 insertions, 549 deletions
diff --git a/tests/cloud_tests/__init__.py b/tests/cloud_tests/__init__.py
index 98c1d6c7..dd436989 100644
--- a/tests/cloud_tests/__init__.py
+++ b/tests/cloud_tests/__init__.py
@@ -10,6 +10,12 @@ TESTCASES_DIR = os.path.join(BASE_DIR, 'testcases')
TEST_CONF_DIR = os.path.join(BASE_DIR, 'testcases')
TREE_BASE = os.sep.join(BASE_DIR.split(os.sep)[:-2])
+# This domain contains reverse lookups for hostnames that are used.
+# The primary reason is so sudo will return quickly when it attempts
+# to look up the hostname. i9n is just short for 'integration'.
+# see also bug 1730744 for why we had to do this.
+CI_DOMAIN = "i9n.cloud-init.io"
+
def _initialize_logging():
"""Configure logging for cloud_tests."""
diff --git a/tests/cloud_tests/bddeb.py b/tests/cloud_tests/bddeb.py
index fba8a0c7..a6d5069f 100644
--- a/tests/cloud_tests/bddeb.py
+++ b/tests/cloud_tests/bddeb.py
@@ -8,7 +8,7 @@ import tempfile
from cloudinit import util as c_util
from tests.cloud_tests import (config, LOG)
-from tests.cloud_tests import (platforms, images, snapshots, instances)
+from tests.cloud_tests import platforms
from tests.cloud_tests.stage import (PlatformComponent, run_stage, run_single)
pre_reqs = ['devscripts', 'equivs', 'git', 'tar']
@@ -84,18 +84,18 @@ def setup_build(args):
# set up image
LOG.info('acquiring image for os: %s', args.build_os)
img_conf = config.load_os_config(platform.platform_name, args.build_os)
- image_call = partial(images.get_image, platform, img_conf)
+ image_call = partial(platforms.get_image, platform, img_conf)
with PlatformComponent(image_call) as image:
# set up snapshot
- snapshot_call = partial(snapshots.get_snapshot, image)
+ snapshot_call = partial(platforms.get_snapshot, image)
with PlatformComponent(snapshot_call) as snapshot:
# create instance with cloud-config to set it up
LOG.info('creating instance to build deb in')
empty_cloud_config = "#cloud-config\n{}"
instance_call = partial(
- instances.get_instance, snapshot, empty_cloud_config,
+ platforms.get_instance, snapshot, empty_cloud_config,
use_desc='build cloud-init deb')
with PlatformComponent(instance_call) as instance:
diff --git a/tests/cloud_tests/collect.py b/tests/cloud_tests/collect.py
index 71ee7645..5ea88e50 100644
--- a/tests/cloud_tests/collect.py
+++ b/tests/cloud_tests/collect.py
@@ -8,7 +8,7 @@ import os
from cloudinit import util as c_util
from tests.cloud_tests import (config, LOG, setup_image, util)
from tests.cloud_tests.stage import (PlatformComponent, run_stage, run_single)
-from tests.cloud_tests import (platforms, images, snapshots, instances)
+from tests.cloud_tests import platforms
def collect_script(instance, base_dir, script, script_name):
@@ -24,16 +24,29 @@ def collect_script(instance, base_dir, script, script_name):
(out, err, exit) = instance.run_script(
script.encode(), rcs=False,
description='collect: {}'.format(script_name))
+ if err:
+ LOG.debug("collect script %s had stderr: %s", script_name, err)
+ if not isinstance(out, bytes):
+ raise util.PlatformError(
+ "Collection of '%s' returned type %s, expected bytes: %s" %
+ (script_name, type(out), out))
+
c_util.write_file(os.path.join(base_dir, script_name), out)
def collect_console(instance, base_dir):
- LOG.debug('getting console log')
+ """Collect instance console log.
+
+ @param instance: instance to get console log for
+ @param base_dir: directory to write console log to
+ """
+ logfile = os.path.join(base_dir, 'console.log')
+ LOG.debug('getting console log for %s to %s', instance, logfile)
try:
data = instance.console_log()
- except NotImplementedError as e:
- data = 'Not Implemented: %s' % e
- with open(os.path.join(base_dir, 'console.log'), "wb") as fp:
+ except NotImplementedError:
+ data = b'instance.console_log: not implemented'
+ with open(logfile, "wb") as fp:
fp.write(data)
@@ -64,9 +77,9 @@ def collect_test_data(args, snapshot, os_name, test_name):
# skip the testcase with a warning
req_features = test_config.get('required_features', [])
if any(feature not in snapshot.features for feature in req_features):
- LOG.warn('test config %s requires features not supported by image, '
- 'skipping.\nrequired features: %s\nsupported features: %s',
- test_name, req_features, snapshot.features)
+ LOG.warning('test config %s requires features not supported by image, '
+ 'skipping.\nrequired features: %s\nsupported features: %s',
+ test_name, req_features, snapshot.features)
return ({}, 0)
# if there are user data overrides required for this test case, apply them
@@ -77,7 +90,7 @@ def collect_test_data(args, snapshot, os_name, test_name):
# create test instance
component = PlatformComponent(
- partial(instances.get_instance, snapshot, user_data,
+ partial(platforms.get_instance, snapshot, user_data,
block=True, start=False, use_desc=test_name))
LOG.info('collecting test data for test: %s', test_name)
@@ -89,12 +102,11 @@ def collect_test_data(args, snapshot, os_name, test_name):
test_output_dir, script, script_name))
for script_name, script in test_scripts.items()]
- console_log = partial(
- run_single, 'collect console',
- partial(collect_console, instance, test_output_dir))
-
res = run_stage('collect for test: {}'.format(test_name),
- [start_call] + collect_calls + [console_log])
+ [start_call] + collect_calls)
+
+ instance.shutdown()
+ collect_console(instance, test_output_dir)
return res
@@ -108,7 +120,7 @@ def collect_snapshot(args, image, os_name):
"""
res = ({}, 1)
- component = PlatformComponent(partial(snapshots.get_snapshot, image))
+ component = PlatformComponent(partial(platforms.get_snapshot, image))
LOG.debug('creating snapshot for %s', os_name)
with component as snapshot:
@@ -136,7 +148,7 @@ def collect_image(args, platform, os_name):
feature_overrides=args.feature_override)
LOG.debug('os config: %s', os_config)
component = PlatformComponent(
- partial(images.get_image, platform, os_config))
+ partial(platforms.get_image, platform, os_config))
LOG.info('acquiring image for os: %s', os_name)
with component as image:
diff --git a/tests/cloud_tests/config.py b/tests/cloud_tests/config.py
index 52fc2bda..8bd569fd 100644
--- a/tests/cloud_tests/config.py
+++ b/tests/cloud_tests/config.py
@@ -92,7 +92,7 @@ def load_platform_config(platform_name, require_enabled=False):
def load_os_config(platform_name, os_name, require_enabled=False,
- feature_overrides={}):
+ feature_overrides=None):
"""Load configuration for os.
@param platform_name: platform name to load os config for
@@ -101,6 +101,8 @@ def load_os_config(platform_name, os_name, require_enabled=False,
@param feature_overrides: feature flag overrides to merge with features
@return_value: config dict
"""
+ if feature_overrides is None:
+ feature_overrides = {}
main_conf = c_util.read_conf(RELEASES_CONF)
default = main_conf['default_release_config']
image = main_conf['releases'][os_name]
diff --git a/tests/cloud_tests/images/__init__.py b/tests/cloud_tests/images/__init__.py
deleted file mode 100644
index 106c59f3..00000000
--- a/tests/cloud_tests/images/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Main init."""
-
-
-def get_image(platform, config):
- """Get image from platform object using os_name."""
- return platform.get_image(config)
-
-# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/instances/__init__.py b/tests/cloud_tests/instances/__init__.py
deleted file mode 100644
index fc2e9cbc..00000000
--- a/tests/cloud_tests/instances/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Main init."""
-
-
-def get_instance(snapshot, *args, **kwargs):
- """Get instance from snapshot."""
- return snapshot.launch(*args, **kwargs)
-
-# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms.yaml b/tests/cloud_tests/platforms.yaml
index fa4f845e..448aa98d 100644
--- a/tests/cloud_tests/platforms.yaml
+++ b/tests/cloud_tests/platforms.yaml
@@ -6,8 +6,13 @@ default_platform_config:
get_image_timeout: 300
# maximum time to create instance (before waiting for cloud-init)
create_instance_timeout: 60
-
+ private_key: cloud_init_rsa
+ public_key: cloud_init_rsa.pub
platforms:
+ ec2:
+ enabled: true
+ instance-type: t2.micro
+ tag: cii
lxd:
enabled: true
# overrides for image templates
@@ -61,9 +66,5 @@ platforms:
{{ config_get("user.vendor-data", properties.default) }}
nocloud-kvm:
enabled: true
- private_key: id_rsa
- public_key: id_rsa.pub
- ec2: {}
- azure: {}
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/__init__.py b/tests/cloud_tests/platforms/__init__.py
index 3490fe87..a01e51ac 100644
--- a/tests/cloud_tests/platforms/__init__.py
+++ b/tests/cloud_tests/platforms/__init__.py
@@ -2,15 +2,27 @@
"""Main init."""
-from tests.cloud_tests.platforms import lxd
-from tests.cloud_tests.platforms import nocloudkvm
+from .ec2 import platform as ec2
+from .lxd import platform as lxd
+from .nocloudkvm import platform as nocloudkvm
PLATFORMS = {
+ 'ec2': ec2.EC2Platform,
'nocloud-kvm': nocloudkvm.NoCloudKVMPlatform,
'lxd': lxd.LXDPlatform,
}
+def get_image(platform, config):
+ """Get image from platform object using os_name."""
+ return platform.get_image(config)
+
+
+def get_instance(snapshot, *args, **kwargs):
+ """Get instance from snapshot."""
+ return snapshot.launch(*args, **kwargs)
+
+
def get_platform(platform_name, config):
"""Get the platform object for 'platform_name' and init."""
platform_cls = PLATFORMS.get(platform_name)
@@ -18,4 +30,10 @@ def get_platform(platform_name, config):
raise ValueError('invalid platform name: {}'.format(platform_name))
return platform_cls(config)
+
+def get_snapshot(image):
+ """Get snapshot from image."""
+ return image.snapshot()
+
+
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/base.py b/tests/cloud_tests/platforms/base.py
deleted file mode 100644
index 28975368..00000000
--- a/tests/cloud_tests/platforms/base.py
+++ /dev/null
@@ -1,27 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Base platform class."""
-
-
-class Platform(object):
- """Base class for platforms."""
-
- platform_name = None
-
- def __init__(self, config):
- """Set up platform."""
- self.config = config
-
- def get_image(self, img_conf):
- """Get image using specified image configuration.
-
- @param img_conf: configuration for image
- @return_value: cloud_tests.images instance
- """
- raise NotImplementedError
-
- def destroy(self):
- """Clean up platform data."""
- pass
-
-# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/ec2/image.py b/tests/cloud_tests/platforms/ec2/image.py
new file mode 100644
index 00000000..7bedf59d
--- /dev/null
+++ b/tests/cloud_tests/platforms/ec2/image.py
@@ -0,0 +1,99 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""EC2 Image Base Class."""
+
+from ..images import Image
+from .snapshot import EC2Snapshot
+from tests.cloud_tests import LOG
+
+
+class EC2Image(Image):
+ """EC2 backed image."""
+
+ platform_name = 'ec2'
+
+ def __init__(self, platform, config, image_ami):
+ """Set up image.
+
+ @param platform: platform object
+ @param config: image configuration
+ @param image_ami: string of image ami ID
+ """
+ super(EC2Image, self).__init__(platform, config)
+ self._img_instance = None
+ self.image_ami = image_ami
+
+ @property
+ def _instance(self):
+ """Internal use only, returns a running instance"""
+ if not self._img_instance:
+ self._img_instance = self.platform.create_instance(
+ self.properties, self.config, self.features,
+ self.image_ami, user_data=None)
+ self._img_instance.start(wait=True, wait_for_cloud_init=True)
+ return self._img_instance
+
+ def destroy(self):
+ """Delete the instance used to create a custom image."""
+ if self._img_instance:
+ LOG.debug('terminating backing instance %s',
+ self._img_instance.instance.instance_id)
+ self._img_instance.instance.terminate()
+ self._img_instance.instance.wait_until_terminated()
+
+ super(EC2Image, self).destroy()
+
+ def _execute(self, *args, **kwargs):
+ """Execute command in image, modifying image."""
+ self._instance.start(wait=True)
+ return self._instance._execute(*args, **kwargs)
+
+ def push_file(self, local_path, remote_path):
+ """Copy file at 'local_path' to instance at 'remote_path'."""
+ self._instance.start(wait=True)
+ return self._instance.push_file(local_path, remote_path)
+
+ def run_script(self, *args, **kwargs):
+ """Run script in image, modifying image.
+
+ @return_value: script output
+ """
+ self._instance.start(wait=True)
+ return self._instance.run_script(*args, **kwargs)
+
+ def snapshot(self):
+ """Create snapshot of image, block until done.
+
+ Will return base image_ami if no instance has been booted, otherwise
+ will run the clean script, shutdown the instance, create a custom
+ AMI, and use that AMI once available.
+ """
+ if not self._img_instance:
+ return EC2Snapshot(self.platform, self.properties, self.config,
+ self.features, self.image_ami,
+ delete_on_destroy=False)
+
+ if self.config.get('boot_clean_script'):
+ self._img_instance.run_script(self.config.get('boot_clean_script'))
+
+ self._img_instance.shutdown(wait=True)
+
+ LOG.debug('creating custom ami from instance %s',
+ self._img_instance.instance.instance_id)
+ response = self.platform.ec2_client.create_image(
+ Name='%s-%s' % (self.platform.tag, self.image_ami),
+ InstanceId=self._img_instance.instance.instance_id
+ )
+ image_ami_edited = response['ImageId']
+
+ # Create image and wait until it is in the 'available' state
+ image = self.platform.ec2_resource.Image(image_ami_edited)
+ image.wait_until_exists()
+ waiter = self.platform.ec2_client.get_waiter('image_available')
+ waiter.wait(ImageIds=[image.id])
+ image.reload()
+
+ return EC2Snapshot(self.platform, self.properties, self.config,
+ self.features, image_ami_edited)
+
+# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/ec2/instance.py b/tests/cloud_tests/platforms/ec2/instance.py
new file mode 100644
index 00000000..ab6037b1
--- /dev/null
+++ b/tests/cloud_tests/platforms/ec2/instance.py
@@ -0,0 +1,132 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Base EC2 instance."""
+import os
+
+import botocore
+
+from ..instances import Instance
+from tests.cloud_tests import LOG, util
+
+
+class EC2Instance(Instance):
+ """EC2 backed instance."""
+
+ platform_name = "ec2"
+ _ssh_client = None
+
+ def __init__(self, platform, properties, config, features,
+ image_ami, user_data=None):
+ """Set up instance.
+
+ @param platform: platform object
+ @param properties: dictionary of properties
+ @param config: dictionary of configuration values
+ @param features: dictionary of supported feature flags
+ @param image_ami: AWS AMI ID for image to use
+ @param user_data: test user-data to pass to instance
+ """
+ super(EC2Instance, self).__init__(
+ platform, image_ami, properties, config, features)
+
+ self.image_ami = image_ami
+ self.instance = None
+ self.user_data = user_data
+ self.ssh_ip = None
+ self.ssh_port = 22
+ self.ssh_key_file = os.path.join(
+ platform.config['data_dir'], platform.config['private_key'])
+ self.ssh_pubkey_file = os.path.join(
+ platform.config['data_dir'], platform.config['public_key'])
+
+ def console_log(self):
+ """Collect console log from instance.
+
+ The console log is buffered and not always present, therefore
+ may return empty string.
+ """
+ try:
+ # OutputBytes comes from platform._decode_console_output_as_bytes
+ response = self.instance.console_output()
+ return response['OutputBytes']
+ except KeyError:
+ if 'Output' in response:
+ msg = ("'OutputBytes' did not exist in console_output() but "
+ "'Output' did: %s..." % response['Output'][0:128])
+ raise util.PlatformError('console_log', msg)
+ return ('No Console Output [%s]' % self.instance).encode()
+
+ def destroy(self):
+ """Clean up instance."""
+ if self.instance:
+ LOG.debug('destroying instance %s', self.instance.id)
+ self.instance.terminate()
+ self.instance.wait_until_terminated()
+
+ self._ssh_close()
+
+ super(EC2Instance, self).destroy()
+
+ def _execute(self, command, stdin=None, env=None):
+ """Execute command on instance."""
+ env_args = []
+ if env:
+ env_args = ['env'] + ["%s=%s" for k, v in env.items()]
+
+ return self._ssh(['sudo'] + env_args + list(command), stdin=stdin)
+
+ def start(self, wait=True, wait_for_cloud_init=False):
+ """Start instance on EC2 with the platfrom's VPC."""
+ if self.instance:
+ if self.instance.state['Name'] == 'running':
+ return
+
+ LOG.debug('starting instance %s', self.instance.id)
+ self.instance.start()
+ else:
+ LOG.debug('launching instance')
+
+ args = {
+ 'ImageId': self.image_ami,
+ 'InstanceType': self.platform.instance_type,
+ 'KeyName': self.platform.key_name,
+ 'MaxCount': 1,
+ 'MinCount': 1,
+ 'SecurityGroupIds': [self.platform.security_group.id],
+ 'SubnetId': self.platform.subnet.id,
+ 'TagSpecifications': [{
+ 'ResourceType': 'instance',
+ 'Tags': [{
+ 'Key': 'Name', 'Value': self.platform.tag
+ }]
+ }],
+ }
+
+ if self.user_data:
+ args['UserData'] = self.user_data
+
+ try:
+ instances = self.platform.ec2_resource.create_instances(**args)
+ except botocore.exceptions.ClientError as error:
+ error_msg = error.response['Error']['Message']
+ raise util.PlatformError('start', error_msg)
+
+ self.instance = instances[0]
+
+ LOG.debug('instance id: %s', self.instance.id)
+ if wait:
+ self.instance.wait_until_running()
+ self.instance.reload()
+ self.ssh_ip = self.instance.public_ip_address
+ self._wait_for_system(wait_for_cloud_init)
+
+ def shutdown(self, wait=True):
+ """Shutdown instance."""
+ LOG.debug('stopping instance %s', self.instance.id)
+ self.instance.stop()
+
+ if wait:
+ self.instance.wait_until_stopped()
+ self.instance.reload()
+
+# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/ec2/platform.py b/tests/cloud_tests/platforms/ec2/platform.py
new file mode 100644
index 00000000..f188c27b
--- /dev/null
+++ b/tests/cloud_tests/platforms/ec2/platform.py
@@ -0,0 +1,258 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Base EC2 platform."""
+from datetime import datetime
+import os
+
+import boto3
+import botocore
+from botocore import session, handlers
+import base64
+
+from ..platforms import Platform
+from .image import EC2Image
+from .instance import EC2Instance
+from tests.cloud_tests import LOG
+
+
+class EC2Platform(Platform):
+ """EC2 test platform."""
+
+ platform_name = 'ec2'
+ ipv4_cidr = '192.168.1.0/20'
+
+ def __init__(self, config):
+ """Set up platform."""
+ super(EC2Platform, self).__init__(config)
+ # Used for unique VPC, SSH key, and custom AMI generation naming
+ self.tag = '%s-%s' % (
+ config['tag'], datetime.now().strftime('%Y%m%d%H%M%S'))
+ self.instance_type = config['instance-type']
+
+ try:
+ b3session = get_session()
+ self.ec2_client = b3session.client('ec2')
+ self.ec2_resource = b3session.resource('ec2')
+ self.ec2_region = b3session.region_name
+ self.key_name = self._upload_public_key(config)
+ except botocore.exceptions.NoRegionError:
+ raise RuntimeError(
+ 'Please configure default region in $HOME/.aws/config')
+ except botocore.exceptions.NoCredentialsError:
+ raise RuntimeError(
+ 'Please configure ec2 credentials in $HOME/.aws/credentials')
+
+ self.vpc = self._create_vpc()
+ self.internet_gateway = self._create_internet_gateway()
+ self.subnet = self._create_subnet()
+ self.routing_table = self._create_routing_table()
+ self.security_group = self._create_security_group()
+
+ def create_instance(self, properties, config, features,
+ image_ami, user_data=None):
+ """Create an instance
+
+ @param src_img_path: image path to launch from
+ @param properties: image properties
+ @param config: image configuration
+ @param features: image features
+ @param image_ami: string of image ami ID
+ @param user_data: test user-data to pass to instance
+ @return_value: cloud_tests.instances instance
+ """
+ return EC2Instance(self, properties, config, features,
+ image_ami, user_data)
+
+ def destroy(self):
+ """Delete SSH keys, terminate all instances, and delete VPC."""
+ for instance in self.vpc.instances.all():
+ LOG.debug('waiting for instance %s termination', instance.id)
+ instance.terminate()
+ instance.wait_until_terminated()
+
+ if self.key_name:
+ LOG.debug('deleting SSH key %s', self.key_name)
+ self.ec2_client.delete_key_pair(KeyName=self.key_name)
+
+ if self.security_group:
+ LOG.debug('deleting security group %s', self.security_group.id)
+ self.security_group.delete()
+
+ if self.subnet:
+ LOG.debug('deleting subnet %s', self.subnet.id)
+ self.subnet.delete()
+
+ if self.routing_table:
+ LOG.debug('deleting routing table %s', self.routing_table.id)
+ self.routing_table.delete()
+
+ if self.internet_gateway:
+ LOG.debug('deleting internet gateway %s', self.internet_gateway.id)
+ self.internet_gateway.detach_from_vpc(VpcId=self.vpc.id)
+ self.internet_gateway.delete()
+
+ if self.vpc:
+ LOG.debug('deleting vpc %s', self.vpc.id)
+ self.vpc.delete()
+
+ def get_image(self, img_conf):
+ """Get image using specified image configuration.
+
+ Hard coded for 'amd64' based images.
+
+ @param img_conf: configuration for image
+ @return_value: cloud_tests.images instance
+ """
+ if img_conf['root-store'] == 'ebs':
+ root_store = 'ssd'
+ elif img_conf['root-store'] == 'instance-store':
+ root_store = 'instance'
+ else:
+ raise RuntimeError('Unknown root-store type: %s' %
+ (img_conf['root-store']))
+
+ filters = [
+ 'arch=%s' % 'amd64',
+ 'endpoint=https://ec2.%s.amazonaws.com' % self.ec2_region,
+ 'region=%s' % self.ec2_region,
+ 'release=%s' % img_conf['release'],
+ 'root_store=%s' % root_store,
+ 'virt=hvm',
+ ]
+
+ LOG.debug('finding image using streams')
+ image = self._query_streams(img_conf, filters)
+
+ try:
+ image_ami = image['id']
+ except KeyError:
+ raise RuntimeError('No images found for %s!' % img_conf['release'])
+
+ LOG.debug('found image: %s', image_ami)
+ image = EC2Image(self, img_conf, image_ami)
+ return image
+
+ def _create_internet_gateway(self):
+ """Create Internet Gateway and assign to VPC."""
+ LOG.debug('creating internet gateway')
+ internet_gateway = self.ec2_resource.create_internet_gateway()
+ internet_gateway.attach_to_vpc(VpcId=self.vpc.id)
+ self._tag_resource(internet_gateway)
+
+ return internet_gateway
+
+ def _create_routing_table(self):
+ """Update default routing table with internet gateway.
+
+ This sets up internet access between the VPC via the internet gateway
+ by configuring routing tables for IPv4 and IPv6.
+ """
+ LOG.debug('creating routing table')
+ route_table = self.vpc.create_route_table()
+ route_table.create_route(DestinationCidrBlock='0.0.0.0/0',
+ GatewayId=self.internet_gateway.id)
+ route_table.create_route(DestinationIpv6CidrBlock='::/0',
+ GatewayId=self.internet_gateway.id)
+ route_table.associate_with_subnet(SubnetId=self.subnet.id)
+ self._tag_resource(route_table)
+
+ return route_table
+
+ def _create_security_group(self):
+ """Enables ingress to default VPC security group."""
+ LOG.debug('creating security group')
+ security_group = self.vpc.create_security_group(
+ GroupName=self.tag, Description='integration test security group')
+ security_group.authorize_ingress(
+ IpProtocol='-1', FromPort=-1, ToPort=-1, CidrIp='0.0.0.0/0')
+ self._tag_resource(security_group)
+
+ return security_group
+
+ def _create_subnet(self):
+ """Generate IPv4 and IPv6 subnets for use."""
+ ipv6_cidr = self.vpc.ipv6_cidr_block_association_set[0][
+ 'Ipv6CidrBlock'][:-2] + '64'
+
+ LOG.debug('creating subnet with following ranges:')
+ LOG.debug('ipv4: %s', self.ipv4_cidr)
+ LOG.debug('ipv6: %s', ipv6_cidr)
+ subnet = self.vpc.create_subnet(CidrBlock=self.ipv4_cidr,
+ Ipv6CidrBlock=ipv6_cidr)
+ modify_subnet = subnet.meta.client.modify_subnet_attribute
+ modify_subnet(SubnetId=subnet.id,
+ MapPublicIpOnLaunch={'Value': True})
+ self._tag_resource(subnet)
+
+ return subnet
+
+ def _create_vpc(self):
+ """Setup AWS EC2 VPC or return existing VPC."""
+ LOG.debug('creating new vpc')
+ try:
+ vpc = self.ec2_resource.create_vpc(
+ CidrBlock=self.ipv4_cidr,
+ AmazonProvidedIpv6CidrBlock=True)
+ except botocore.exceptions.ClientError as e:
+ raise RuntimeError(e)
+
+ vpc.wait_until_available()
+ self._tag_resource(vpc)
+
+ return vpc
+
+ def _tag_resource(self, resource):
+ """Tag a resource with the specified tag.
+
+ This makes finding and deleting resources specific to this testing
+ much easier to find.
+
+ @param resource: resource to tag
+ """
+ tag = {
+ 'Key': 'Name',
+ 'Value': self.tag
+ }
+ resource.create_tags(Tags=[tag])
+
+ def _upload_public_key(self, config):
+ """Generate random name and upload SSH key with that name.
+
+ @param config: platform config
+ @return: string of ssh key name
+ """
+ key_file = os.path.join(config['data_dir'], config['public_key'])
+ with open(key_file, 'r') as file:
+ public_key = file.read().strip('\n')
+
+ LOG.debug('uploading SSH key %s', self.tag)
+ self.ec2_client.import_key_pair(KeyName=self.tag,
+ PublicKeyMaterial=public_key)
+
+ return self.tag
+
+
+def _decode_console_output_as_bytes(parsed, **kwargs):
+ """Provide console output as bytes in OutputBytes.
+
+ For this to be useful, the session has to have had the
+ decode_console_output handler unregistered already.
+
+ https://github.com/boto/botocore/issues/1351 ."""
+ if 'Output' not in parsed:
+ return
+ orig = parsed['Output']
+ handlers.decode_console_output(parsed, **kwargs)
+ parsed['OutputBytes'] = base64.b64decode(orig)
+
+
+def get_session():
+ mysess = session.get_session()
+ mysess.unregister('after-call.ec2.GetConsoleOutput',
+ handlers.decode_console_output)
+ mysess.register('after-call.ec2.GetConsoleOutput',
+ _decode_console_output_as_bytes)
+ return boto3.Session(botocore_session=mysess)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/platforms/ec2/snapshot.py b/tests/cloud_tests/platforms/ec2/snapshot.py
new file mode 100644
index 00000000..2c48cb54
--- /dev/null
+++ b/tests/cloud_tests/platforms/ec2/snapshot.py
@@ -0,0 +1,66 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Base EC2 snapshot."""
+
+from ..snapshots import Snapshot
+from tests.cloud_tests import LOG
+
+
+class EC2Snapshot(Snapshot):
+ """EC2 image copy backed snapshot."""
+
+ platform_name = 'ec2'
+
+ def __init__(self, platform, properties, config, features, image_ami,
+ delete_on_destroy=True):
+ """Set up snapshot.
+
+ @param platform: platform object
+ @param properties: image properties
+ @param config: image config
+ @param features: supported feature flags
+ @param image_ami: string of image ami ID
+ @param delete_on_destroy: boolean to delete on destroy
+ """
+ super(EC2Snapshot, self).__init__(
+ platform, properties, config, features)
+
+ self.image_ami = image_ami
+ self.delete_on_destroy = delete_on_destroy
+
+ def destroy(self):
+ """Deregister the backing AMI."""
+ if self.delete_on_destroy:
+ image = self.platform.ec2_resource.Image(self.image_ami)
+ snapshot_id = image.block_device_mappings[0]['Ebs']['SnapshotId']
+
+ LOG.debug('removing custom ami %s', self.image_ami)
+ self.platform.ec2_client.deregister_image(ImageId=self.image_ami)
+
+ LOG.debug('removing custom snapshot %s', snapshot_id)
+ self.platform.ec2_client.delete_snapshot(SnapshotId=snapshot_id)
+
+ def launch(self, user_data, meta_data=None, block=True, start=True,
+ use_desc=None):
+ """Launch instance.
+
+ @param user_data: user-data for the instance
+ @param meta_data: meta_data for the instance
+ @param block: wait until instance is created
+ @param start: start instance and wait until fully started
+ @param use_desc: string of test name
+ @return_value: an Instance
+ """
+ if meta_data is not None:
+ raise ValueError("metadata not supported on Ec2")
+
+ instance = self.platform.create_instance(
+ self.properties, self.config, self.features,
+ self.image_ami, user_data)
+
+ if start:
+ instance.start()
+
+ return instance
+
+# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/images/base.py b/tests/cloud_tests/platforms/images.py
index d503108a..557a5cf6 100644
--- a/tests/cloud_tests/images/base.py
+++ b/tests/cloud_tests/platforms/images.py
@@ -26,7 +26,8 @@ class Image(TargetBase):
@property
def properties(self):
"""{} containing: 'arch', 'os', 'version', 'release'."""
- raise NotImplementedError
+ return {k: self.config[k]
+ for k in ('arch', 'os', 'release', 'version')}
@property
def features(self):
diff --git a/tests/cloud_tests/instances/base.py b/tests/cloud_tests/platforms/instances.py
index 8c59d62c..3bad021f 100644
--- a/tests/cloud_tests/instances/base.py
+++ b/tests/cloud_tests/platforms/instances.py
@@ -1,14 +1,21 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""Base instance."""
+import time
+
+import paramiko
+from paramiko.ssh_exception import (
+ BadHostKeyException, AuthenticationException, SSHException)
from ..util import TargetBase
+from tests.cloud_tests import LOG, util
class Instance(TargetBase):
"""Base instance object."""
platform_name = None
+ _ssh_client = None
def __init__(self, platform, name, properties, config, features):
"""Set up instance.
@@ -26,6 +33,11 @@ class Instance(TargetBase):
self.features = features
self._tmp_count = 0
+ self.ssh_ip = None
+ self.ssh_port = None
+ self.ssh_key_file = None
+ self.ssh_username = 'ubuntu'
+
def console_log(self):
"""Instance console.
@@ -47,7 +59,63 @@ class Instance(TargetBase):
def destroy(self):
"""Clean up instance."""
- pass
+ self._ssh_close()
+
+ def _ssh(self, command, stdin=None):
+ """Run a command via SSH."""
+ client = self._ssh_connect()
+
+ cmd = util.shell_pack(command)
+ fp_in, fp_out, fp_err = client.exec_command(cmd)
+ channel = fp_in.channel
+
+ if stdin is not None:
+ fp_in.write(stdin)
+ fp_in.close()
+
+ channel.shutdown_write()
+ rc = channel.recv_exit_status()
+
+ return (fp_out.read(), fp_err.read(), rc)
+
+ def _ssh_close(self):
+ if self._ssh_client:
+ try:
+ self._ssh_client.close()
+ except SSHException:
+ LOG.warning('Failed to close SSH connection.')
+ self._ssh_client = None
+
+ def _ssh_connect(self):
+ """Connect via SSH."""
+ if self._ssh_client:
+ return self._ssh_client
+
+ if not self.ssh_ip or not self.ssh_port:
+ raise ValueError
+
+ client = paramiko.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ private_key = paramiko.RSAKey.from_private_key_file(self.ssh_key_file)
+
+ retries = 30
+ while retries:
+ try:
+ client.connect(username=self.ssh_username,
+ hostname=self.ssh_ip, port=self.ssh_port,
+ pkey=private_key, banner_timeout=30)
+ self._ssh_client = client
+ return client
+ except (ConnectionRefusedError, AuthenticationException,
+ BadHostKeyException, ConnectionResetError, SSHException,
+ OSError) as e:
+ retries -= 1
+ time.sleep(10)
+
+ ssh_cmd = 'Failed ssh connection to %s@%s:%s after 300 seconds' % (
+ self.ssh_username, self.ssh_ip, self.ssh_port
+ )
+ raise util.InTargetExecuteError(b'', b'', 1, ssh_cmd, 'ssh')
def _wait_for_system(self, wait_for_cloud_init):
"""Wait until system has fully booted and cloud-init has finished.
diff --git a/tests/cloud_tests/images/lxd.py b/tests/cloud_tests/platforms/lxd/image.py
index 5caeba41..b5de1f52 100644
--- a/tests/cloud_tests/images/lxd.py
+++ b/tests/cloud_tests/platforms/lxd/image.py
@@ -6,13 +6,13 @@ import os
import shutil
import tempfile
+from ..images import Image
+from .snapshot import LXDSnapshot
from cloudinit import util as c_util
-from tests.cloud_tests.images import base
-from tests.cloud_tests.snapshots import lxd as lxd_snapshot
from tests.cloud_tests import util
-class LXDImage(base.Image):
+class LXDImage(Image):
"""LXD backed image."""
platform_name = "lxd"
@@ -182,9 +182,8 @@ class LXDImage(base.Image):
instance.run_script(self.config.get('boot_clean_script'))
# freeze current instance and return snapshot
instance.freeze()
- return lxd_snapshot.LXDSnapshot(
- self.platform, self.properties, self.config,
- self.features, instance)
+ return LXDSnapshot(self.platform, self.properties, self.config,
+ self.features, instance)
def destroy(self):
"""Clean up data associated with image."""
diff --git a/tests/cloud_tests/instances/lxd.py b/tests/cloud_tests/platforms/lxd/instance.py
index 3b035d86..d2d2a1fd 100644
--- a/tests/cloud_tests/instances/lxd.py
+++ b/tests/cloud_tests/platforms/lxd/instance.py
@@ -2,14 +2,16 @@
"""Base LXD instance."""
-from . import base
-
import os
import shutil
from tempfile import mkdtemp
+from cloudinit.util import subp, ProcessExecutionError
+
+from ..instances import Instance
-class LXDInstance(base.Instance):
+
+class LXDInstance(Instance):
"""LXD container backed instance."""
platform_name = "lxd"
@@ -29,6 +31,7 @@ class LXDInstance(base.Instance):
platform, name, properties, config, features)
self.tmpd = mkdtemp(prefix="%s-%s" % (type(self).__name__, name))
self._setup_console_log()
+ self.name = name
@property
def pylxd_container(self):
@@ -55,33 +58,25 @@ class LXDInstance(base.Instance):
if env is None:
env = {}
- if stdin is not None:
- # pylxd does not support input to execute.
- # https://github.com/lxc/pylxd/issues/244
- #
- # The solution here is write a tmp file in the container
- # and then execute a shell that sets it standard in to
- # be from that file, removes it, and calls the comand.
- tmpf = self.tmpfile()
- self.write_data(tmpf, stdin)
- ncmd = 'exec <"{tmpf}"; rm -f "{tmpf}"; exec "$@"'
- command = (['sh', '-c', ncmd.format(tmpf=tmpf), 'stdinhack'] +
- list(command))
+ env_args = []
+ if env:
+ env_args = ['env'] + ["%s=%s" for k, v in env.items()]
# ensure instance is running and execute the command
self.start()
- # execute returns a ContainerExecuteResult, named tuple
- # (exit_code, stdout, stderr)
- res = self.pylxd_container.execute(command, environment=env)
-
- # get out, exit and err from pylxd return
- if not hasattr(res, 'exit_code'):
- # pylxd 2.1.3 and earlier only return out and err, no exit
- raise RuntimeError(
- "No 'exit_code' in pylxd.container.execute return.\n"
- "pylxd > 2.2 is required.")
-
- return res.stdout, res.stderr, res.exit_code
+
+ # Use cmdline client due to https://github.com/lxc/pylxd/issues/268
+ exit_code = 0
+ try:
+ stdout, stderr = subp(
+ ['lxc', 'exec', self.name, '--'] + env_args + list(command),
+ data=stdin, decode=False)
+ except ProcessExecutionError as e:
+ exit_code = e.exit_code
+ stdout = e.stdout
+ stderr = e.stderr
+
+ return stdout, stderr, exit_code
def read_data(self, remote_path, decode=False):
"""Read data from instance filesystem.
diff --git a/tests/cloud_tests/platforms/lxd.py b/tests/cloud_tests/platforms/lxd/platform.py
index ead0955b..6a016929 100644
--- a/tests/cloud_tests/platforms/lxd.py
+++ b/tests/cloud_tests/platforms/lxd/platform.py
@@ -4,15 +4,15 @@
from pylxd import (Client, exceptions)
-from tests.cloud_tests.images import lxd as lxd_image
-from tests.cloud_tests.instances import lxd as lxd_instance
-from tests.cloud_tests.platforms import base
+from ..platforms import Platform
+from .image import LXDImage
+from .instance import LXDInstance
from tests.cloud_tests import util
DEFAULT_SSTREAMS_SERVER = "https://images.linuxcontainers.org:8443"
-class LXDPlatform(base.Platform):
+class LXDPlatform(Platform):
"""LXD test platform."""
platform_name = 'lxd'
@@ -33,7 +33,7 @@ class LXDPlatform(base.Platform):
pylxd_image = self.client.images.create_from_simplestreams(
img_conf.get('sstreams_server', DEFAULT_SSTREAMS_SERVER),
img_conf['alias'])
- image = lxd_image.LXDImage(self, img_conf, pylxd_image)
+ image = LXDImage(self, img_conf, pylxd_image)
if img_conf.get('override_templates', False):
image.update_templates(self.config.get('template_overrides', {}),
self.config.get('template_files', {}))
@@ -69,8 +69,8 @@ class LXDPlatform(base.Platform):
'source': ({'type': 'image', 'fingerprint': image} if image else
{'type': 'copy', 'source': container})
}, wait=block)
- return lxd_instance.LXDInstance(self, container.name, properties,
- config, features, container)
+ return LXDInstance(self, container.name, properties, config, features,
+ container)
def container_exists(self, container_name):
"""Check if container with name 'container_name' exists.
diff --git a/tests/cloud_tests/snapshots/lxd.py b/tests/cloud_tests/platforms/lxd/snapshot.py
index 39c55c5e..b524644f 100644
--- a/tests/cloud_tests/snapshots/lxd.py
+++ b/tests/cloud_tests/platforms/lxd/snapshot.py
@@ -2,10 +2,10 @@
"""Base LXD snapshot."""
-from tests.cloud_tests.snapshots import base
+from ..snapshots import Snapshot
-class LXDSnapshot(base.Snapshot):
+class LXDSnapshot(Snapshot):
"""LXD image copy backed snapshot."""
platform_name = "lxd"
diff --git a/tests/cloud_tests/images/nocloudkvm.py b/tests/cloud_tests/platforms/nocloudkvm/image.py
index 8678b07f..bc2b6e75 100644
--- a/tests/cloud_tests/images/nocloudkvm.py
+++ b/tests/cloud_tests/platforms/nocloudkvm/image.py
@@ -8,11 +8,11 @@ import os
import shutil
import tempfile
-from tests.cloud_tests.images import base
-from tests.cloud_tests.snapshots import nocloudkvm as nocloud_kvm_snapshot
+from ..images import Image
+from .snapshot import NoCloudKVMSnapshot
-class NoCloudKVMImage(base.Image):
+class NoCloudKVMImage(Image):
"""NoCloud KVM backed image."""
platform_name = "nocloud-kvm"
@@ -35,16 +35,6 @@ class NoCloudKVMImage(base.Image):
super(NoCloudKVMImage, self).__init__(platform, config)
- @property
- def properties(self):
- """Dictionary containing: 'arch', 'os', 'version', 'release'."""
- return {
- 'arch': self.config['arch'],
- 'os': self.config['family'],
- 'release': self.config['release'],
- 'version': self.config['version'],
- }
-
def _execute(self, command, stdin=None, env=None):
"""Execute command in image, modifying image."""
return self.mount_image_callback(command, stdin=stdin, env=env)
@@ -71,9 +61,8 @@ class NoCloudKVMImage(base.Image):
if not self._img_path:
raise RuntimeError()
- return nocloud_kvm_snapshot.NoCloudKVMSnapshot(
- self.platform, self.properties, self.config,
- self.features, self._img_path)
+ return NoCloudKVMSnapshot(self.platform, self.properties, self.config,
+ self.features, self._img_path)
def destroy(self):
"""Unset path to signal image is no longer used.
diff --git a/tests/cloud_tests/instances/nocloudkvm.py b/tests/cloud_tests/platforms/nocloudkvm/instance.py
index bc06a79e..932dc0fa 100644
--- a/tests/cloud_tests/instances/nocloudkvm.py
+++ b/tests/cloud_tests/platforms/nocloudkvm/instance.py
@@ -2,15 +2,17 @@
"""Base NoCloud KVM instance."""
+import copy
import os
-import paramiko
import socket
import subprocess
import time
+import uuid
+from ..instances import Instance
+from cloudinit.atomic_helper import write_json
from cloudinit import util as c_util
-from tests.cloud_tests.instances import base
-from tests.cloud_tests import util
+from tests.cloud_tests import LOG, util
# This domain contains reverse lookups for hostnames that are used.
# The primary reason is so sudo will return quickly when it attempts
@@ -19,11 +21,10 @@ from tests.cloud_tests import util
CI_DOMAIN = "i9n.cloud-init.io"
-class NoCloudKVMInstance(base.Instance):
+class NoCloudKVMInstance(Instance):
"""NoCloud KVM backed instance."""
platform_name = "nocloud-kvm"
- _ssh_client = None
def __init__(self, platform, name, image_path, properties, config,
features, user_data, meta_data):
@@ -36,18 +37,72 @@ class NoCloudKVMInstance(base.Instance):
@param config: dictionary of configuration values
@param features: dictionary of supported feature flags
"""
+ super(NoCloudKVMInstance, self).__init__(
+ platform, name, properties, config, features
+ )
+
self.user_data = user_data
- self.meta_data = meta_data
- self.ssh_key_file = os.path.join(platform.config['data_dir'],
- platform.config['private_key'])
+ if meta_data:
+ meta_data = copy.deepcopy(meta_data)
+ else:
+ meta_data = {}
+
+ if 'instance-id' in meta_data:
+ iid = meta_data['instance-id']
+ else:
+ iid = str(uuid.uuid1())
+ meta_data['instance-id'] = iid
+
+ self.instance_id = iid
+ self.ssh_key_file = os.path.join(
+ platform.config['data_dir'], platform.config['private_key'])
+ self.ssh_pubkey_file = os.path.join(
+ platform.config['data_dir'], platform.config['public_key'])
+
+ self.ssh_pubkey = None
+ if self.ssh_pubkey_file:
+ with open(self.ssh_pubkey_file, "r") as fp:
+ self.ssh_pubkey = fp.read().rstrip('\n')
+
+ if not meta_data.get('public-keys'):
+ meta_data['public-keys'] = []
+ meta_data['public-keys'].append(self.ssh_pubkey)
+
+ self.ssh_ip = '127.0.0.1'
self.ssh_port = None
self.pid = None
self.pid_file = None
self.console_file = None
self.disk = image_path
+ self.meta_data = meta_data
- super(NoCloudKVMInstance, self).__init__(
- platform, name, properties, config, features)
+ def shutdown(self, wait=True):
+ """Shutdown instance."""
+
+ if self.pid:
+ # This relies on _execute which uses sudo over ssh. The ssh
+ # connection would get killed before sudo exited, so ignore errors.
+ cmd = ['shutdown', 'now']
+ try:
+ self._execute(cmd)
+ except util.InTargetExecuteError:
+ pass
+ self._ssh_close()
+
+ if wait:
+ LOG.debug("Executed shutdown. waiting on pid %s to end",
+ self.pid)
+ time_for_shutdown = 120
+ give_up_at = time.time() + time_for_shutdown
+ pid_file_path = '/proc/%s' % self.pid
+ msg = ("pid %s did not exit in %s seconds after shutdown." %
+ (self.pid, time_for_shutdown))
+ while True:
+ if not os.path.exists(pid_file_path):
+ break
+ if time.time() > give_up_at:
+ raise util.PlatformError("shutdown", msg)
+ self.pid = None
def destroy(self):
"""Clean up instance."""
@@ -61,9 +116,7 @@ class NoCloudKVMInstance(base.Instance):
os.remove(self.pid_file)
self.pid = None
- if self._ssh_client:
- self._ssh_client.close()
- self._ssh_client = None
+ self._ssh_close()
super(NoCloudKVMInstance, self).destroy()
@@ -72,17 +125,21 @@ class NoCloudKVMInstance(base.Instance):
if env:
env_args = ['env'] + ["%s=%s" for k, v in env.items()]
- return self.ssh(['sudo'] + env_args + list(command), stdin=stdin)
+ return self._ssh(['sudo'] + env_args + list(command), stdin=stdin)
def generate_seed(self, tmpdir):
"""Generate nocloud seed from user-data"""
seed_file = os.path.join(tmpdir, '%s_seed.img' % self.name)
user_data_file = os.path.join(tmpdir, '%s_user_data' % self.name)
+ meta_data_file = os.path.join(tmpdir, '%s_meta_data' % self.name)
with open(user_data_file, "w") as ud_file:
ud_file.write(self.user_data)
- c_util.subp(['cloud-localds', seed_file, user_data_file])
+ # meta-data can be yaml, but more easily pretty printed with json
+ write_json(meta_data_file, self.meta_data)
+ c_util.subp(['cloud-localds', seed_file, user_data_file,
+ meta_data_file])
return seed_file
@@ -94,50 +151,6 @@ class NoCloudKVMInstance(base.Instance):
s.close()
return num
- def ssh(self, command, stdin=None):
- """Run a command via SSH."""
- client = self._ssh_connect()
-
- cmd = util.shell_pack(command)
- try:
- fp_in, fp_out, fp_err = client.exec_command(cmd)
- channel = fp_in.channel
- if stdin is not None:
- fp_in.write(stdin)
- fp_in.close()
-
- channel.shutdown_write()
- rc = channel.recv_exit_status()
- return (fp_out.read(), fp_err.read(), rc)
- except paramiko.SSHException as e:
- raise util.InTargetExecuteError(
- b'', b'', -1, command, self.name, reason=e)
-
- def _ssh_connect(self, hostname='localhost', username='ubuntu',
- banner_timeout=120, retry_attempts=30):
- """Connect via SSH."""
- if self._ssh_client:
- return self._ssh_client
-
- private_key = paramiko.RSAKey.from_private_key_file(self.ssh_key_file)
- client = paramiko.SSHClient()
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- while retry_attempts:
- try:
- client.connect(hostname=hostname, username=username,
- port=self.ssh_port, pkey=private_key,
- banner_timeout=banner_timeout)
- self._ssh_client = client
- return client
- except (paramiko.SSHException, TypeError):
- time.sleep(1)
- retry_attempts = retry_attempts - 1
-
- error_desc = 'Failed command to: %s@%s:%s' % (username, hostname,
- self.ssh_port)
- raise util.InTargetExecuteError('', '', -1, 'ssh connect',
- self.name, error_desc)
-
def start(self, wait=True, wait_for_cloud_init=False):
"""Start instance."""
tmpdir = self.platform.config['data_dir']
diff --git a/tests/cloud_tests/platforms/nocloudkvm.py b/tests/cloud_tests/platforms/nocloudkvm/platform.py
index 76cd83ad..a7e6f5de 100644
--- a/tests/cloud_tests/platforms/nocloudkvm.py
+++ b/tests/cloud_tests/platforms/nocloudkvm/platform.py
@@ -9,18 +9,22 @@ from simplestreams import mirrors
from simplestreams import objectstores
from simplestreams import util as s_util
+from ..platforms import Platform
+from .image import NoCloudKVMImage
+from .instance import NoCloudKVMInstance
from cloudinit import util as c_util
-from tests.cloud_tests.images import nocloudkvm as nocloud_kvm_image
-from tests.cloud_tests.instances import nocloudkvm as nocloud_kvm_instance
-from tests.cloud_tests.platforms import base
from tests.cloud_tests import util
-class NoCloudKVMPlatform(base.Platform):
+class NoCloudKVMPlatform(Platform):
"""NoCloud KVM test platform."""
platform_name = 'nocloud-kvm'
+ def __init__(self, config):
+ """Set up platform."""
+ super(NoCloudKVMPlatform, self).__init__(config)
+
def get_image(self, img_conf):
"""Get image using specified image configuration.
@@ -62,7 +66,7 @@ class NoCloudKVMPlatform(base.Platform):
"Multiple images found in '%s': %s" % (search_d,
' '.join(images)))
- image = nocloud_kvm_image.NoCloudKVMImage(self, img_conf, images[0])
+ image = NoCloudKVMImage(self, img_conf, images[0])
return image
def create_instance(self, properties, config, features,
@@ -83,9 +87,7 @@ class NoCloudKVMPlatform(base.Platform):
c_util.subp(['qemu-img', 'create', '-f', 'qcow2',
'-b', src_img_path, img_path])
- return nocloud_kvm_instance.NoCloudKVMInstance(self, name, img_path,
- properties, config,
- features, user_data,
- meta_data)
+ return NoCloudKVMInstance(self, name, img_path, properties, config,
+ features, user_data, meta_data)
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/snapshots/nocloudkvm.py b/tests/cloud_tests/platforms/nocloudkvm/snapshot.py
index 21e908da..2dae3590 100644
--- a/tests/cloud_tests/snapshots/nocloudkvm.py
+++ b/tests/cloud_tests/platforms/nocloudkvm/snapshot.py
@@ -5,10 +5,10 @@ import os
import shutil
import tempfile
-from tests.cloud_tests.snapshots import base
+from ..snapshots import Snapshot
-class NoCloudKVMSnapshot(base.Snapshot):
+class NoCloudKVMSnapshot(Snapshot):
"""NoCloud KVM image copy backed snapshot."""
platform_name = "nocloud-kvm"
@@ -41,10 +41,6 @@ class NoCloudKVMSnapshot(base.Snapshot):
@param use_desc: description of snapshot instance use
@return_value: an Instance
"""
- key_file = os.path.join(self.platform.config['data_dir'],
- self.platform.config['public_key'])
- user_data = self.inject_ssh_key(user_data, key_file)
-
instance = self.platform.create_instance(
self.properties, self.config, self.features,
self._image_path, image_desc=str(self), use_desc=use_desc,
@@ -55,22 +51,6 @@ class NoCloudKVMSnapshot(base.Snapshot):
return instance
- def inject_ssh_key(self, user_data, key_file):
- """Inject the authorized key into the user_data."""
- with open(key_file) as f:
- value = f.read()
-
- key = 'ssh_authorized_keys:'
- value = ' - %s' % value.strip()
- user_data = user_data.split('\n')
- if key in user_data:
- user_data.insert(user_data.index(key) + 1, '%s' % value)
- else:
- user_data.insert(-1, '%s' % key)
- user_data.insert(-1, '%s' % value)
-
- return '\n'.join(user_data)
-
def destroy(self):
"""Clean up snapshot data."""
shutil.rmtree(self._workd)
diff --git a/tests/cloud_tests/platforms/platforms.py b/tests/cloud_tests/platforms/platforms.py
new file mode 100644
index 00000000..1542b3be
--- /dev/null
+++ b/tests/cloud_tests/platforms/platforms.py
@@ -0,0 +1,96 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Base platform class."""
+import os
+
+from simplestreams import filters, mirrors
+from simplestreams import util as s_util
+
+from cloudinit import util as c_util
+
+
+class Platform(object):
+ """Base class for platforms."""
+
+ platform_name = None
+
+ def __init__(self, config):
+ """Set up platform."""
+ self.config = config
+ self._generate_ssh_keys(config['data_dir'])
+
+ def get_image(self, img_conf):
+ """Get image using specified image configuration.
+
+ @param img_conf: configuration for image
+ @return_value: cloud_tests.images instance
+ """
+ raise NotImplementedError
+
+ def destroy(self):
+ """Clean up platform data."""
+ pass
+
+ def _generate_ssh_keys(self, data_dir):
+ """Generate SSH keys to be used with image."""
+ filename = os.path.join(data_dir, self.config['private_key'])
+
+ if os.path.exists(filename):
+ c_util.del_file(filename)
+
+ c_util.subp(['ssh-keygen', '-t', 'rsa', '-b', '4096',
+ '-f', filename, '-P', '',
+ '-C', 'ubuntu@cloud_test'],
+ capture=True)
+
+ @staticmethod
+ def _query_streams(img_conf, img_filter):
+ """Query streams for latest image given a specific filter.
+
+ @param img_conf: configuration for image
+ @param filters: array of filters as strings format 'key=value'
+ @return: dictionary with latest image information or empty
+ """
+ def policy(content, path):
+ return s_util.read_signed(content, keyring=img_conf['keyring'])
+
+ (url, path) = s_util.path_from_mirror_url(img_conf['mirror_url'], None)
+ smirror = mirrors.UrlMirrorReader(url, policy=policy)
+
+ config = {'max_items': 1, 'filters': filters.get_filters(img_filter)}
+ tmirror = FilterMirror(config)
+ tmirror.sync(smirror, path)
+
+ try:
+ return tmirror.json_entries[0]
+ except IndexError:
+ raise RuntimeError('no images found with filter: %s' % img_filter)
+
+
+class FilterMirror(mirrors.BasicMirrorWriter):
+ """Taken from sstream-query to return query result as json array."""
+
+ def __init__(self, config=None):
+ super(FilterMirror, self).__init__(config=config)
+ if config is None:
+ config = {}
+ self.config = config
+ self.filters = config.get('filters', [])
+ self.json_entries = []
+
+ def load_products(self, path=None, content_id=None):
+ return {'content_id': content_id, 'products': {}}
+
+ def filter_item(self, data, src, target, pedigree):
+ return filters.filter_item(self.filters, data, src, pedigree)
+
+ def insert_item(self, data, src, target, pedigree, contentsource):
+ # src and target are top level products:1.0
+ # data is src['products'][ped[0]]['versions'][ped[1]]['items'][ped[2]]
+ # contentsource is a ContentSource if 'path' exists in data or None
+ data = s_util.products_exdata(src, pedigree)
+ if 'path' in data:
+ data.update({'item_url': contentsource.url})
+ self.json_entries.append(data)
+
+# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/snapshots/base.py b/tests/cloud_tests/platforms/snapshots.py
index 94328982..94328982 100644
--- a/tests/cloud_tests/snapshots/base.py
+++ b/tests/cloud_tests/platforms/snapshots.py
diff --git a/tests/cloud_tests/releases.yaml b/tests/cloud_tests/releases.yaml
index e5933802..d8bc170f 100644
--- a/tests/cloud_tests/releases.yaml
+++ b/tests/cloud_tests/releases.yaml
@@ -27,10 +27,14 @@ default_release_config:
# features groups and additional feature settings
feature_groups: []
features: {}
- nocloud-kvm:
mirror_url: https://cloud-images.ubuntu.com/daily
- mirror_dir: '/srv/citest/nocloud-kvm'
+ mirror_dir: '/srv/citest/images'
keyring: /usr/share/keyrings/ubuntu-cloudimage-keyring.gpg
+ ec2:
+ # Choose from: [ebs, instance-store]
+ root-store: ebs
+ boot_timeout: 300
+ nocloud-kvm:
setup_overrides: null
override_templates: false
# lxd specific default configuration options
@@ -128,7 +132,7 @@ releases:
enabled: true
release: bionic
version: 18.04
- family: ubuntu
+ os: ubuntu
feature_groups:
- base
- debian_base
@@ -144,7 +148,7 @@ releases:
enabled: true
release: artful
version: 17.10
- family: ubuntu
+ os: ubuntu
feature_groups:
- base
- debian_base
@@ -154,29 +158,13 @@ releases:
alias: artful
setup_overrides: null
override_templates: false
- zesty:
- # EOL: Jan 2018
- default:
- enabled: true
- release: zesty
- version: 17.04
- family: ubuntu
- feature_groups:
- - base
- - debian_base
- - ubuntu_specific
- lxd:
- sstreams_server: https://cloud-images.ubuntu.com/daily
- alias: zesty
- setup_overrides: null
- override_templates: false
xenial:
# EOL: Apr 2021
default:
enabled: true
release: xenial
version: 16.04
- family: ubuntu
+ os: ubuntu
feature_groups:
- base
- debian_base
@@ -192,7 +180,7 @@ releases:
enabled: true
release: trusty
version: 14.04
- family: ubuntu
+ os: ubuntu
feature_groups:
- base
- debian_base
diff --git a/tests/cloud_tests/setup_image.py b/tests/cloud_tests/setup_image.py
index 179f40db..6d242115 100644
--- a/tests/cloud_tests/setup_image.py
+++ b/tests/cloud_tests/setup_image.py
@@ -5,7 +5,6 @@
from functools import partial
import os
-from cloudinit import util as c_util
from tests.cloud_tests import LOG
from tests.cloud_tests import stage, util
@@ -192,20 +191,6 @@ def enable_repo(args, image):
image.execute(cmd, description=msg)
-def generate_ssh_keys(data_dir):
- """Generate SSH keys to be used with image."""
- LOG.info('generating SSH keys')
- filename = os.path.join(data_dir, 'id_rsa')
-
- if os.path.exists(filename):
- c_util.del_file(filename)
-
- c_util.subp(['ssh-keygen', '-t', 'rsa', '-b', '4096',
- '-f', filename, '-P', '',
- '-C', 'ubuntu@cloud_test'],
- capture=True)
-
-
def setup_image(args, image):
"""Set up image as specified in args.
@@ -239,9 +224,6 @@ def setup_image(args, image):
LOG.info('setting up %s', image)
res = stage.run_stage(
'set up for {}'.format(image), calls, continue_after_error=False)
- LOG.debug('after setup complete, installed cloud-init version is: %s',
- installed_package_version(image, 'cloud-init'))
- generate_ssh_keys(args.data_dir)
return res
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/snapshots/__init__.py b/tests/cloud_tests/snapshots/__init__.py
deleted file mode 100644
index 93a54f5e..00000000
--- a/tests/cloud_tests/snapshots/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Main init."""
-
-
-def get_snapshot(image):
- """Get snapshot from image."""
- return image.snapshot()
-
-# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases.yaml b/tests/cloud_tests/testcases.yaml
index 7183e017..8e0fb62f 100644
--- a/tests/cloud_tests/testcases.yaml
+++ b/tests/cloud_tests/testcases.yaml
@@ -7,22 +7,37 @@ base_test_data:
#cloud-config
collect_scripts:
cloud-init.log: |
- #!/bin/bash
+ #!/bin/sh
cat /var/log/cloud-init.log
cloud-init-output.log: |
- #!/bin/bash
+ #!/bin/sh
cat /var/log/cloud-init-output.log
instance-id: |
- #!/bin/bash
+ #!/bin/sh
cat /run/cloud-init/.instance-id
result.json: |
- #!/bin/bash
+ #!/bin/sh
cat /run/cloud-init/result.json
status.json: |
- #!/bin/bash
+ #!/bin/sh
cat /run/cloud-init/status.json
cloud-init-version: |
- #!/bin/bash
+ #!/bin/sh
dpkg-query -W -f='${Version}' cloud-init
+ system.journal.gz: |
+ #!/bin/sh
+ [ -d /run/systemd ] || { echo "not systemd."; exit 0; }
+ fail() { echo "ERROR:" "$@" 1>&2; exit 1; }
+ journal=""
+ for d in /run/log/journal /var/log/journal; do
+ for f in $d/*/system.journal; do
+ [ -f "$f" ] || continue
+ [ -z "$journal" ] ||
+ fail "multiple journal found: $f $journal."
+ journal="$f"
+ done
+ done
+ [ -f "$journal" ] || fail "no journal file found."
+ gzip --to-stdout "$journal"
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/base.py b/tests/cloud_tests/testcases/base.py
index 1706f59b..20e95955 100644
--- a/tests/cloud_tests/testcases/base.py
+++ b/tests/cloud_tests/testcases/base.py
@@ -12,7 +12,8 @@ from cloudinit import util as c_util
class CloudTestCase(unittest.TestCase):
"""Base test class for verifiers."""
- data = None
+ # data gets populated in get_suite.setUpClass
+ data = {}
conf = None
_cloud_config = None
@@ -29,12 +30,14 @@ class CloudTestCase(unittest.TestCase):
raise AssertionError('Key "{}" not in cloud config'.format(name))
return self.cloud_config[name]
- def get_data_file(self, name):
+ def get_data_file(self, name, decode=True):
"""Get data file failing test if it is not present."""
if name not in self.data:
raise AssertionError('File "{}" missing from collect data'
.format(name))
- return self.data[name]
+ if not decode:
+ return self.data[name]
+ return self.data[name].decode('utf-8')
def get_instance_id(self):
"""Get recorded instance id."""
diff --git a/tests/cloud_tests/testcases/modules/apt_configure_sources_list.py b/tests/cloud_tests/testcases/modules/apt_configure_sources_list.py
index 129d2264..cf84e056 100644
--- a/tests/cloud_tests/testcases/modules/apt_configure_sources_list.py
+++ b/tests/cloud_tests/testcases/modules/apt_configure_sources_list.py
@@ -10,6 +10,11 @@ class TestAptconfigureSourcesList(base.CloudTestCase):
def test_sources_list(self):
"""Test sources.list includes sources."""
out = self.get_data_file('sources.list')
+
+ # Verify we have 6 entires
+ self.assertEqual(6, len(out.rstrip().split('\n')))
+
+ # Verify the keys generated the list correctly
self.assertRegex(out, r'deb http:\/\/archive.ubuntu.com\/ubuntu '
'[a-z].* main restricted')
self.assertRegex(out, r'deb-src http:\/\/archive.ubuntu.com\/ubuntu '
diff --git a/tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml b/tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml
index 143cb080..87e470c1 100644
--- a/tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml
+++ b/tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml
@@ -7,6 +7,12 @@ required_features:
cloud_config: |
#cloud-config
apt:
+ primary:
+ - arches: [default]
+ uri: http://archive.ubuntu.com/ubuntu
+ security:
+ - arches: [default]
+ uri: http://security.ubuntu.com/ubuntu
sources_list: |
deb $MIRROR $RELEASE main restricted
deb-src $MIRROR $RELEASE main restricted
diff --git a/tests/cloud_tests/testcases/modules/ntp_pools.yaml b/tests/cloud_tests/testcases/modules/ntp_pools.yaml
index 3a93faa2..d490b228 100644
--- a/tests/cloud_tests/testcases/modules/ntp_pools.yaml
+++ b/tests/cloud_tests/testcases/modules/ntp_pools.yaml
@@ -26,6 +26,6 @@ collect_scripts:
grep '^pool' /etc/ntp.conf
ntpq_servers: |
#!/bin/sh
- ntpq -p -w
+ ntpq -p -w -n
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/modules/ntp_servers.yaml b/tests/cloud_tests/testcases/modules/ntp_servers.yaml
index d59d45a8..6b13b70e 100644
--- a/tests/cloud_tests/testcases/modules/ntp_servers.yaml
+++ b/tests/cloud_tests/testcases/modules/ntp_servers.yaml
@@ -22,6 +22,6 @@ collect_scripts:
grep '^server' /etc/ntp.conf
ntpq_servers: |
#!/bin/sh
- ntpq -p -w
+ ntpq -p -w -n
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/modules/set_hostname_fqdn.py b/tests/cloud_tests/testcases/modules/set_hostname_fqdn.py
index eb6f0650..a405b30b 100644
--- a/tests/cloud_tests/testcases/modules/set_hostname_fqdn.py
+++ b/tests/cloud_tests/testcases/modules/set_hostname_fqdn.py
@@ -1,7 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""cloud-init Integration Test Verify Script."""
-from tests.cloud_tests.instances.nocloudkvm import CI_DOMAIN
+from tests.cloud_tests import CI_DOMAIN
from tests.cloud_tests.testcases import base
diff --git a/tests/cloud_tests/util.py b/tests/cloud_tests/util.py
index c5cd6974..6ff285e7 100644
--- a/tests/cloud_tests/util.py
+++ b/tests/cloud_tests/util.py
@@ -262,7 +262,7 @@ def shell_safe(cmd):
out = subprocess.check_output(
["getopt", "--shell", "sh", "--options", "", "--", "--"] + list(cmd))
# out contains ' -- <data>\n'. drop the ' -- ' and the '\n'
- return out[4:-1].decode()
+ return out.decode()[4:-1]
def shell_pack(cmd):
@@ -321,9 +321,9 @@ class TargetBase(object):
rcs = (0,)
if description:
- LOG.debug('Executing "%s"', description)
+ LOG.debug('executing "%s"', description)
else:
- LOG.debug("Executing command: %s", shell_quote(command))
+ LOG.debug("executing command: %s", shell_quote(command))
out, err, rc = self._execute(command=command, stdin=stdin, env=env)
@@ -447,6 +447,19 @@ class InTargetExecuteError(c_util.ProcessExecutionError):
reason=reason)
+class PlatformError(IOError):
+ """Error type for platform errors."""
+
+ default_desc = 'unexpected error in platform.'
+
+ def __init__(self, operation, description=None):
+ """Init error and parent error class."""
+ description = description if description else self.default_desc
+
+ message = '%s: %s' % (operation, description)
+ IOError.__init__(self, message)
+
+
class TempDir(object):
"""Configurable temporary directory like tempfile.TemporaryDirectory."""
diff --git a/tests/cloud_tests/verify.py b/tests/cloud_tests/verify.py
index fc1efcfc..2a9fd520 100644
--- a/tests/cloud_tests/verify.py
+++ b/tests/cloud_tests/verify.py
@@ -29,7 +29,7 @@ def verify_data(base_dir, tests):
data = {}
test_dir = os.path.join(base_dir, test_name)
for script_name in os.listdir(test_dir):
- with open(os.path.join(test_dir, script_name), 'r') as fp:
+ with open(os.path.join(test_dir, script_name), 'rb') as fp:
data[script_name] = fp.read()
# get test suite and launch tests
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
index fccbbd23..0c0f427a 100644
--- a/tests/unittests/test_cli.py
+++ b/tests/unittests/test_cli.py
@@ -1,16 +1,21 @@
# This file is part of cloud-init. See LICENSE file for license information.
+from collections import namedtuple
+import os
import six
+from cloudinit.cmd import main as cli
from cloudinit.tests import helpers as test_helpers
+from cloudinit.util import load_file, load_json
-from cloudinit.cmd import main as cli
mock = test_helpers.mock
class TestCLI(test_helpers.FilesystemMockingTestCase):
+ with_logs = True
+
def setUp(self):
super(TestCLI, self).setUp()
self.stderr = six.StringIO()
@@ -24,6 +29,76 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
except SystemExit as e:
return e.code
+ def test_status_wrapper_errors_on_invalid_name(self):
+ """status_wrapper will error when the name parameter is not valid.
+
+ Valid name values are only init and modules.
+ """
+ tmpd = self.tmp_dir()
+ data_d = self.tmp_path('data', tmpd)
+ link_d = self.tmp_path('link', tmpd)
+ FakeArgs = namedtuple('FakeArgs', ['action', 'local', 'mode'])
+
+ def myaction():
+ raise Exception('Should not call myaction')
+
+ myargs = FakeArgs(('doesnotmatter', myaction), False, 'bogusmode')
+ with self.assertRaises(ValueError) as cm:
+ cli.status_wrapper('init1', myargs, data_d, link_d)
+ self.assertEqual('unknown name: init1', str(cm.exception))
+ self.assertNotIn('Should not call myaction', self.logs.getvalue())
+
+ def test_status_wrapper_errors_on_invalid_modes(self):
+ """status_wrapper will error if a parameter combination is invalid."""
+ tmpd = self.tmp_dir()
+ data_d = self.tmp_path('data', tmpd)
+ link_d = self.tmp_path('link', tmpd)
+ FakeArgs = namedtuple('FakeArgs', ['action', 'local', 'mode'])
+
+ def myaction():
+ raise Exception('Should not call myaction')
+
+ myargs = FakeArgs(('modules_name', myaction), False, 'bogusmode')
+ with self.assertRaises(ValueError) as cm:
+ cli.status_wrapper('modules', myargs, data_d, link_d)
+ self.assertEqual(
+ "Invalid cloud init mode specified 'modules-bogusmode'",
+ str(cm.exception))
+ self.assertNotIn('Should not call myaction', self.logs.getvalue())
+
+ def test_status_wrapper_init_local_writes_fresh_status_info(self):
+ """When running in init-local mode, status_wrapper writes status.json.
+
+ Old status and results artifacts are also removed.
+ """
+ tmpd = self.tmp_dir()
+ data_d = self.tmp_path('data', tmpd)
+ link_d = self.tmp_path('link', tmpd)
+ status_link = self.tmp_path('status.json', link_d)
+ # Write old artifacts which will be removed or updated.
+ for _dir in data_d, link_d:
+ test_helpers.populate_dir(
+ _dir, {'status.json': 'old', 'result.json': 'old'})
+
+ FakeArgs = namedtuple('FakeArgs', ['action', 'local', 'mode'])
+
+ def myaction(name, args):
+ # Return an error to watch status capture them
+ return 'SomeDatasource', ['an error']
+
+ myargs = FakeArgs(('ignored_name', myaction), True, 'bogusmode')
+ cli.status_wrapper('init', myargs, data_d, link_d)
+ # No errors reported in status
+ status_v1 = load_json(load_file(status_link))['v1']
+ self.assertEqual(['an error'], status_v1['init-local']['errors'])
+ self.assertEqual('SomeDatasource', status_v1['datasource'])
+ self.assertFalse(
+ os.path.exists(self.tmp_path('result.json', data_d)),
+ 'unexpected result.json found')
+ self.assertFalse(
+ os.path.exists(self.tmp_path('result.json', link_d)),
+ 'unexpected result.json link found')
+
def test_no_arguments_shows_usage(self):
exit_code = self._call_main()
self.assertIn('usage: cloud-init', self.stderr.getvalue())
@@ -45,8 +120,8 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
"""All known subparsers are represented in the cloud-int help doc."""
self._call_main()
error = self.stderr.getvalue()
- expected_subcommands = ['analyze', 'init', 'modules', 'single',
- 'dhclient-hook', 'features', 'devel']
+ expected_subcommands = ['analyze', 'clean', 'devel', 'dhclient-hook',
+ 'features', 'init', 'modules', 'single']
for subcommand in expected_subcommands:
self.assertIn(subcommand, error)
@@ -76,9 +151,11 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
self.patchStdoutAndStderr(stdout=stdout)
expected_errors = [
- 'usage: cloud-init analyze', 'usage: cloud-init collect-logs',
- 'usage: cloud-init devel']
- conditional_subcommands = ['analyze', 'collect-logs', 'devel']
+ 'usage: cloud-init analyze', 'usage: cloud-init clean',
+ 'usage: cloud-init collect-logs', 'usage: cloud-init devel',
+ 'usage: cloud-init status']
+ conditional_subcommands = [
+ 'analyze', 'clean', 'collect-logs', 'devel', 'status']
# The cloud-init entrypoint calls main without passing sys_argv
for subcommand in conditional_subcommands:
with mock.patch('sys.argv', ['cloud-init', subcommand, '-h']):
@@ -106,6 +183,22 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
self._call_main(['cloud-init', 'collect-logs', '-h'])
self.assertIn('usage: cloud-init collect-log', stdout.getvalue())
+ def test_clean_subcommand_parser(self):
+ """The subcommand cloud-init clean calls the subparser."""
+ # Provide -h param to clean to avoid having to mock behavior.
+ stdout = six.StringIO()
+ self.patchStdoutAndStderr(stdout=stdout)
+ self._call_main(['cloud-init', 'clean', '-h'])
+ self.assertIn('usage: cloud-init clean', stdout.getvalue())
+
+ def test_status_subcommand_parser(self):
+ """The subcommand cloud-init status calls the subparser."""
+ # Provide -h param to clean to avoid having to mock behavior.
+ stdout = six.StringIO()
+ self.patchStdoutAndStderr(stdout=stdout)
+ self._call_main(['cloud-init', 'status', '-h'])
+ self.assertIn('usage: cloud-init status', stdout.getvalue())
+
def test_devel_subcommand_parser(self):
"""The subcommand cloud-init devel calls the correct subparser."""
self._call_main(['cloud-init', 'devel'])
diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py
index ee88520d..2a1095b9 100644
--- a/tests/unittests/test_cs_util.py
+++ b/tests/unittests/test_cs_util.py
@@ -35,6 +35,7 @@ class CepkoMock(Cepko):
# touched the underlying Cepko class methods.
class CepkoResultTests(test_helpers.TestCase):
def setUp(self):
+ self.c = Cepko()
raise test_helpers.SkipTest('This test is completely useless')
def test_getitem(self):
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py
index 82ee9714..4fa9616b 100644
--- a/tests/unittests/test_datasource/test_aliyun.py
+++ b/tests/unittests/test_datasource/test_aliyun.py
@@ -47,6 +47,9 @@ def register_mock_metaserver(base_url, data):
elif isinstance(body, list):
register(base_url.rstrip('/'), '\n'.join(body) + '\n')
elif isinstance(body, dict):
+ if not body:
+ register(base_url.rstrip('/') + '/', 'not found',
+ status_code=404)
vals = []
for k, v in body.items():
if isinstance(v, (str, list)):
@@ -67,7 +70,7 @@ class TestAliYunDatasource(test_helpers.HttprettyTestCase):
super(TestAliYunDatasource, self).setUp()
cfg = {'datasource': {'AliYun': {'timeout': '1', 'max_wait': '1'}}}
distro = {}
- paths = helpers.Paths({})
+ paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.ds = ay.DataSourceAliYun(cfg, distro, paths)
self.metadata_address = self.ds.metadata_urls[0]
@@ -91,9 +94,22 @@ class TestAliYunDatasource(test_helpers.HttprettyTestCase):
self.metadata_address,
self.ds.min_metadata_version, 'user-data')
+ # EC2 provides an instance-identity document which must return 404 here
+ # for this test to pass.
+ @property
+ def default_identity(self):
+ return {}
+
+ @property
+ def identity_url(self):
+ return os.path.join(self.metadata_address,
+ self.ds.min_metadata_version,
+ 'dynamic', 'instance-identity')
+
def regist_default_server(self):
register_mock_metaserver(self.metadata_url, self.default_metadata)
register_mock_metaserver(self.userdata_url, self.default_userdata)
+ register_mock_metaserver(self.identity_url, self.default_identity)
def _test_get_data(self):
self.assertEqual(self.ds.metadata, self.default_metadata)
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index a4dfb540..3253f3ad 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -18,7 +18,7 @@ import tempfile
from cloudinit import helpers
from cloudinit import util
-from cloudinit.tests.helpers import TestCase
+from cloudinit.tests.helpers import CiTestCase
import cloudinit.sources.DataSourceAltCloud as dsac
@@ -97,7 +97,7 @@ def _dmi_data(expected):
return _data
-class TestGetCloudType(TestCase):
+class TestGetCloudType(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_cloud_type()
'''
@@ -143,14 +143,16 @@ class TestGetCloudType(TestCase):
self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
-class TestGetDataCloudInfoFile(TestCase):
+class TestGetDataCloudInfoFile(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
With a contrived CLOUD_INFO_FILE
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.cloud_info_file = tempfile.mkstemp()[1]
self.dmi_data = util.read_dmi_data
dsac.CLOUD_INFO_FILE = self.cloud_info_file
@@ -207,14 +209,16 @@ class TestGetDataCloudInfoFile(TestCase):
self.assertEqual(False, dsrc.get_data())
-class TestGetDataNoCloudInfoFile(TestCase):
+class TestGetDataNoCloudInfoFile(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
Without a CLOUD_INFO_FILE
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.dmi_data = util.read_dmi_data
dsac.CLOUD_INFO_FILE = \
'no such file'
@@ -254,7 +258,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
self.assertEqual(False, dsrc.get_data())
-class TestUserDataRhevm(TestCase):
+class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
@@ -320,7 +324,7 @@ class TestUserDataRhevm(TestCase):
self.assertEqual(False, dsrc.user_data_rhevm())
-class TestUserDataVsphere(TestCase):
+class TestUserDataVsphere(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
'''
@@ -368,7 +372,7 @@ class TestUserDataVsphere(TestCase):
self.assertEqual(1, m_mount_cb.call_count)
-class TestReadUserDataCallback(TestCase):
+class TestReadUserDataCallback(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
'''
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 7cb1812a..254e9876 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -5,20 +5,19 @@ from cloudinit.util import b64e, decode_binary, load_file, write_file
from cloudinit.sources import DataSourceAzure as dsaz
from cloudinit.util import find_freebsd_part
from cloudinit.util import get_path_dev_freebsd
-
+from cloudinit.version import version_string as vs
from cloudinit.tests.helpers import (CiTestCase, TestCase, populate_dir, mock,
ExitStack, PY26, SkipTest)
import crypt
import os
-import shutil
import stat
-import tempfile
import xml.etree.ElementTree as ET
import yaml
-def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
+def construct_valid_ovf_env(data=None, pubkeys=None,
+ userdata=None, platform_settings=None):
if data is None:
data = {'HostName': 'FOOHOST'}
if pubkeys is None:
@@ -38,9 +37,9 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
"""
for key, dval in data.items():
if isinstance(dval, dict):
- val = dval.get('text')
- attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
- if k != 'text'])
+ val = dict(dval).get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v
+ in dict(dval).items() if k != 'text'])
else:
val = dval
attrs = ""
@@ -68,10 +67,12 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<KmsServerHostname>kms.core.windows.net</KmsServerHostname>
<ProvisionGuestAgent>false</ProvisionGuestAgent>
- <GuestAgentPackageName i:nil="true" />
- </PlatformSettings></wa:PlatformSettingsSection>
-</Environment>
- """
+ <GuestAgentPackageName i:nil="true" />"""
+ if platform_settings:
+ for k, v in platform_settings.items():
+ content += "<%s>%s</%s>\n" % (k, v, k)
+ content += """</PlatformSettings></wa:PlatformSettingsSection>
+</Environment>"""
return content
@@ -84,11 +85,11 @@ class TestAzureDataSource(CiTestCase):
super(TestAzureDataSource, self).setUp()
if PY26:
raise SkipTest("Does not work on python 2.6")
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
self.patches = ExitStack()
@@ -176,6 +177,7 @@ scbus-1 on xpt0 bus 0
(dsaz, 'get_hostname', mock.MagicMock()),
(dsaz, 'set_hostname', mock.MagicMock()),
(dsaz, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (dsaz.util, 'which', lambda x: True),
(dsaz.util, 'read_dmi_data', mock.MagicMock(
side_effect=_dmi_mocks)),
(dsaz.util, 'wait_for_files', mock.MagicMock(
@@ -642,7 +644,9 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertEqual(netconfig, expected_config)
-class TestAzureBounce(TestCase):
+class TestAzureBounce(CiTestCase):
+
+ with_logs = True
def mock_out_azure_moving_parts(self):
self.patches.enter_context(
@@ -655,6 +659,8 @@ class TestAzureBounce(TestCase):
self.patches.enter_context(
mock.patch.object(dsaz, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
+ self.patches.enter_context(
+ mock.patch.object(dsaz.util, 'which', lambda x: True))
def _dmi_mocks(key):
if key == 'system-uuid':
@@ -669,10 +675,10 @@ class TestAzureBounce(TestCase):
def setUp(self):
super(TestAzureBounce, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.tmp_dir()
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.patches = ExitStack()
self.mock_out_azure_moving_parts()
@@ -714,21 +720,24 @@ class TestAzureBounce(TestCase):
def test_disabled_bounce_does_not_change_hostname(self):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_disabled_bounce_does_not_perform_bounce(
self, perform_hostname_bounce):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
def test_same_hostname_does_not_change_hostname(self):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -737,7 +746,8 @@ class TestAzureBounce(TestCase):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -751,6 +761,22 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, perform_hostname_bounce.call_count)
+ def test_bounce_skipped_on_ifupdown_absent(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ dsrc = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg),
+ agent_command=['not', '__builtin__'])
+ patch_path = 'cloudinit.sources.DataSourceAzure.util.which'
+ with mock.patch(patch_path) as m_which:
+ m_which.return_value = None
+ ret = self._get_and_setup(dsrc)
+ self.assertEqual([mock.call('ifup')], m_which.call_args_list)
+ self.assertTrue(ret)
+ self.assertIn(
+ "Skipping network bounce: ifupdown utils aren't present.",
+ self.logs.getvalue())
+
def test_different_hostnames_sets_hostname(self):
expected_hostname = 'azure-expected-host-name'
self.get_hostname.return_value = 'default-host-name'
@@ -815,9 +841,7 @@ class TestAzureBounce(TestCase):
self.assertEqual(hostname, bounce_env['hostname'])
self.assertEqual(old_hostname, bounce_env['old_hostname'])
- def test_default_bounce_command_used_by_default(self):
- cmd = 'default-bounce-command'
- dsaz.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ def test_default_bounce_command_ifup_used_by_default(self):
cfg = {'hostname_bounce': {'policy': 'force'}}
data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
dsrc = self._get_ds(data, agent_command=['not', '__builtin__'])
@@ -825,7 +849,8 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, self.subp.call_count)
bounce_args = self.subp.call_args[1]['args']
- self.assertEqual(cmd, bounce_args)
+ self.assertEqual(
+ dsaz.BOUNCE_COMMAND_IFUP, bounce_args)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_set_hostname_option_can_disable_bounce(
@@ -895,9 +920,6 @@ class TestCanDevBeReformatted(CiTestCase):
setattr(self, sattr, patcher.start())
self.addCleanup(patcher.stop)
- def setUp(self):
- super(TestCanDevBeReformatted, self).setUp()
-
def patchup(self, devs):
bypath = {}
for path, data in devs.items():
@@ -952,14 +974,14 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda3': {'num': 3},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
def test_no_partitions_is_false(self):
"""A disk with no partitions can not be formatted."""
self.patchup({'/dev/sda': {}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not partitioned", msg.lower())
def test_two_partitions_not_ntfs_false(self):
@@ -971,7 +993,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_two_partitions_ntfs_populated_false(self):
@@ -984,7 +1006,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['secret.txt']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_two_partitions_ntfs_empty_is_true(self):
@@ -996,7 +1018,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_not_ntfs_false(self):
@@ -1007,7 +1029,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'zfs'},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_one_partition_ntfs_populated_false(self):
@@ -1019,7 +1041,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['file1.txt', 'file2.exe']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_one_partition_ntfs_empty_is_true(self):
@@ -1030,7 +1052,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self):
@@ -1042,7 +1064,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['dataloss_warning_readme.txt']}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_through_realpath_is_true(self):
@@ -1057,7 +1079,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb1'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_three_partition_through_realpath_is_false(self):
@@ -1076,7 +1098,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb3'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
@@ -1088,4 +1110,146 @@ class TestAzureNetExists(CiTestCase):
self.assertTrue(hasattr(dsaz, "DataSourceAzureNet"))
+@mock.patch('cloudinit.sources.DataSourceAzure.util.subp')
+@mock.patch.object(dsaz, 'get_hostname')
+@mock.patch.object(dsaz, 'set_hostname')
+class TestAzureDataSourcePreprovisioning(CiTestCase):
+
+ def setUp(self):
+ super(TestAzureDataSourcePreprovisioning, self).setUp()
+ tmp = self.tmp_dir()
+ self.waagent_d = self.tmp_path('/var/lib/waagent', tmp)
+ self.paths = helpers.Paths({'cloud_dir': tmp})
+ dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+
+ def test_read_azure_ovf_with_true_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag if the proper setting is present."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "True"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertTrue(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_with_false_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag to false if the proper setting is false."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "False"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_without_flag(self, *args):
+ """The read_azure_ovf method should not set the
+ PreprovisionedVM cfg flag."""
+ content = construct_valid_ovf_env()
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test_poll_imds_returns_ovf_env(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _poll_imds method should return the ovf_env.xml."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text="ovf")
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(len(dsa._poll_imds()) > 0)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', timeout=60.0,
+ url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test__reprovision_calls__poll_imds(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _reprovision method should call poll IMDS."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
+ 'unknown-245': '624c3620'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ hostname = "myhost"
+ username = "myuser"
+ odata = {'HostName': hostname, 'UserName': username}
+ content = construct_valid_ovf_env(data=odata)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text=content)
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ md, ud, cfg, d = dsa._reprovision()
+ self.assertEqual(md['local-hostname'], hostname)
+ self.assertEqual(cfg['system_info']['default_user']['name'], username)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', timeout=60.0, url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_true_cfg(self, isfile, write_f, *args):
+ """The _should_reprovision method should return true with config
+ flag present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'PreprovisionedVm': True}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_file_existing(self, isfile, *args):
+ """The _should_reprovision method should return True if the sentinal
+ exists."""
+ isfile.return_value = True
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'preprovisionedvm': False}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_returns_false(self, isfile, *args):
+ """The _should_reprovision method should return False
+ if config and sentinal are not present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertFalse(dsa._should_reprovision((None, None, {}, None)))
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index e4c59907..f6a59b6b 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -3,6 +3,7 @@
import copy
from cloudinit.cs_utils import Cepko
+from cloudinit import helpers
from cloudinit import sources
from cloudinit.sources import DataSourceCloudSigma
@@ -38,10 +39,12 @@ class CepkoMock(Cepko):
return self
-class DataSourceCloudSigmaTest(test_helpers.TestCase):
+class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
self.datasource.get_data()
@@ -85,7 +88,8 @@ class DataSourceCloudSigmaTest(test_helpers.TestCase):
def test_lack_of_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
@@ -94,7 +98,8 @@ class DataSourceCloudSigmaTest(test_helpers.TestCase):
def test_lack_of_cloudinit_key_in_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]["cloudinit"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index 96144b64..d6d2d6b2 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -33,6 +33,7 @@ class TestCloudStackPasswordFetching(CiTestCase):
self.patches.enter_context(mock.patch(
mod_name + '.dhcp.networkd_get_option_from_leases',
get_networkd_server_address))
+ self.tmp = self.tmp_dir()
def _set_password_server_response(self, response_string):
subp = mock.MagicMock(return_value=(response_string, ''))
@@ -43,26 +44,30 @@ class TestCloudStackPasswordFetching(CiTestCase):
def test_empty_password_doesnt_create_config(self):
self._set_password_server_response('')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual({}, ds.get_config_obj())
def test_saved_password_doesnt_create_config(self):
self._set_password_server_response('saved_password')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual({}, ds.get_config_obj())
def test_password_sets_password(self):
password = 'SekritSquirrel'
self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual(password, ds.get_config_obj()['password'])
def test_bad_request_doesnt_stop_ds_from_working(self):
self._set_password_server_response('bad_request')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
self.assertTrue(ds.get_data())
def assertRequestTypesSent(self, subp, expected_request_types):
@@ -77,14 +82,16 @@ class TestCloudStackPasswordFetching(CiTestCase):
def test_valid_response_means_password_marked_as_saved(self):
password = 'SekritSquirrel'
subp = self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertRequestTypesSent(subp,
['send_my_password', 'saved_password'])
def _check_password_not_saved_for(self, response_string):
subp = self._set_password_server_response(response_string)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertRequestTypesSent(subp, ['send_my_password'])
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 237c189b..68400f22 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -3,9 +3,6 @@
from copy import copy, deepcopy
import json
import os
-import shutil
-import six
-import tempfile
from cloudinit import helpers
from cloudinit.net import eni
@@ -15,7 +12,7 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from cloudinit.tests.helpers import TestCase, ExitStack, mock
+from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -223,12 +220,11 @@ CFG_DRIVE_FILES_V2 = {
'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
-class TestConfigDriveDataSource(TestCase):
+class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -462,6 +458,12 @@ class TestConfigDriveDataSource(TestCase):
self.assertEqual(["/dev/vdb3"],
ds.find_candidate_devs())
+ # Verify that uppercase labels are also found.
+ devs_with_answers = {"TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=CONFIG-2": ["/dev/vdb"]}
+ self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
+
finally:
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
@@ -469,31 +471,27 @@ class TestConfigDriveDataSource(TestCase):
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
-class TestNetJson(TestCase):
+class TestNetJson(CiTestCase):
def setUp(self):
super(TestNetJson, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
self.maxDiff = None
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertIsNotNone(myds.network_json)
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
network_config = openstack.convert_net_json(NETWORK_DATA,
known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
@@ -598,11 +596,10 @@ class TestNetJson(TestCase):
self.assertEqual(out_data, conv_data)
-class TestConvertNetworkData(TestCase):
+class TestConvertNetworkData(CiTestCase):
def setUp(self):
super(TestConvertNetworkData, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
def _getnames_in_config(self, ncfg):
return set([n['name'] for n in ncfg['config']
@@ -724,14 +721,18 @@ class TestConvertNetworkData(TestCase):
self.assertEqual(expected, config_name2mac)
-def cfg_ds_from_dir(seed_d):
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- cfg_ds.seed_dir = seed_d
+def cfg_ds_from_dir(base_d, files=None):
+ run = os.path.join(base_d, "run")
+ os.mkdir(run)
+ cfg_ds = ds.DataSourceConfigDrive(
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': run}))
+ cfg_ds.seed_dir = os.path.join(base_d, "seed")
+ if files:
+ populate_dir(cfg_ds.seed_dir, files)
cfg_ds.known_macs = KNOWN_MACS.copy()
if not cfg_ds.get_data():
raise RuntimeError("Data source did not extract itself from"
- " seed directory %s" % seed_d)
+ " seed directory %s" % cfg_ds.seed_dir)
return cfg_ds
@@ -749,17 +750,4 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.network_json, known_macs=KNOWN_MACS)
-def populate_dir(seed_dir, files):
- for (name, content) in files.items():
- path = os.path.join(seed_dir, name)
- dirname = os.path.dirname(path)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
- if isinstance(content, six.text_type):
- mode = "w"
- else:
- mode = "wb"
- with open(path, mode) as fp:
- fp.write(content)
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index f264f361..3127014b 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -13,7 +13,7 @@ from cloudinit import settings
from cloudinit.sources import DataSourceDigitalOcean
from cloudinit.sources.helpers import digitalocean
-from cloudinit.tests.helpers import mock, TestCase
+from cloudinit.tests.helpers import mock, CiTestCase
DO_MULTIPLE_KEYS = ["ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@do.co",
"ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@do.co"]
@@ -135,14 +135,17 @@ def _mock_dmi():
return (True, DO_META.get('id'))
-class TestDataSourceDigitalOcean(TestCase):
+class TestDataSourceDigitalOcean(CiTestCase):
"""
Test reading the meta-data
"""
+ def setUp(self):
+ super(TestDataSourceDigitalOcean, self).setUp()
+ self.tmp = self.tmp_dir()
def get_ds(self, get_sysinfo=_mock_dmi):
ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
- settings.CFG_BUILTIN, None, helpers.Paths({}))
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp}))
ds.use_ip4LL = False
if get_sysinfo is not None:
ds._get_sysinfo = get_sysinfo
@@ -194,11 +197,10 @@ class TestDataSourceDigitalOcean(TestCase):
self.assertIsInstance(ds.get_public_ssh_keys(), list)
-class TestNetworkConvert(TestCase):
+class TestNetworkConvert(CiTestCase):
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
- def _get_networking(self, m_get_by_mac):
- m_get_by_mac.return_value = {
+ def _get_networking(self):
+ self.m_get_by_mac.return_value = {
'04:01:57:d1:9e:01': 'ens1',
'04:01:57:d1:9e:02': 'ens2',
'b8:ae:ed:75:5f:9a': 'enp0s25',
@@ -208,6 +210,10 @@ class TestNetworkConvert(TestCase):
self.assertIn('config', netcfg)
return netcfg
+ def setUp(self):
+ super(TestNetworkConvert, self).setUp()
+ self.add_patch('cloudinit.net.get_interfaces_by_mac', 'm_get_by_mac')
+
def test_networking_defined(self):
netcfg = self._get_networking()
self.assertIsNotNone(netcfg)
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index ba328ee9..0f7267bb 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -186,6 +186,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
super(TestEc2, self).setUp()
self.datasource = ec2.DataSourceEc2
self.metadata_addr = self.datasource.metadata_urls[0]
+ self.tmp = self.tmp_dir()
def data_url(self, version):
"""Return a metadata url based on the version provided."""
@@ -199,7 +200,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
def _setup_ds(self, sys_cfg, platform_data, md, md_version=None):
self.uris = []
distro = {}
- paths = helpers.Paths({})
+ paths = helpers.Paths({'run_dir': self.tmp})
if sys_cfg is None:
sys_cfg = {}
ds = self.datasource(sys_cfg=sys_cfg, distro=distro, paths=paths)
@@ -329,7 +330,8 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds.fallback_nic = 'eth9'
with mock.patch(get_interface_mac_path) as m_get_interface_mac:
m_get_interface_mac.return_value = mac1
- ds.network_config # Will re-crawl network metadata
+ nc = ds.network_config # Will re-crawl network metadata
+ self.assertIsNotNone(nc)
self.assertIn('Re-crawl of metadata service', self.logs.getvalue())
expected = {'version': 1, 'config': [
{'mac_address': '06:17:04:d7:26:09',
@@ -423,7 +425,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
self.logs.getvalue())
@httpretty.activate
- @mock.patch('cloudinit.net.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
@mock.patch('cloudinit.net.find_fallback_nic')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
@mock.patch('cloudinit.sources.DataSourceEc2.util.is_FreeBSD')
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index d399ae7a..f77c2c40 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -4,13 +4,16 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
+import datetime
import httpretty
+import json
import mock
import re
from base64 import b64encode, b64decode
from six.moves.urllib_parse import urlparse
+from cloudinit import distros
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceGCE
@@ -21,10 +24,7 @@ from cloudinit.tests import helpers as test_helpers
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
- 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
'instance/hostname': 'server.project-foo.local',
- # UnicodeDecodeError below if set to ds.userdata instead of userdata_raw
- 'instance/attributes/user-data': b'/bin/echo \xff\n',
}
GCE_META_PARTIAL = {
@@ -37,11 +37,13 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
- 'instance/attributes/user-data-encoding': 'base64',
+ 'instance/attributes': {
+ 'user-data': b64encode(b'/bin/echo baz\n').decode('utf-8'),
+ 'user-data-encoding': 'base64',
+ }
}
-HEADERS = {'X-Google-Metadata-Request': 'True'}
+HEADERS = {'Metadata-Flavor': 'Google'}
MD_URL_RE = re.compile(
r'http://metadata.google.internal/computeMetadata/v1/.*')
@@ -54,10 +56,15 @@ def _set_mock_metadata(gce_meta=None):
url_path = urlparse(uri).path
if url_path.startswith('/computeMetadata/v1/'):
path = url_path.split('/computeMetadata/v1/')[1:][0]
+ recursive = path.endswith('/')
+ path = path.rstrip('/')
else:
path = None
if path in gce_meta:
- return (200, headers, gce_meta.get(path))
+ response = gce_meta.get(path)
+ if recursive:
+ response = json.dumps(response)
+ return (200, headers, response)
else:
return (404, headers, '')
@@ -69,10 +76,21 @@ def _set_mock_metadata(gce_meta=None):
@httpretty.activate
class TestDataSourceGCE(test_helpers.HttprettyTestCase):
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
def setUp(self):
+ tmp = self.tmp_dir()
self.ds = DataSourceGCE.DataSourceGCE(
settings.CFG_BUILTIN, None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': tmp}))
ppatch = self.m_platform_reports_gce = mock.patch(
'cloudinit.sources.DataSourceGCE.platform_reports_gce')
self.m_platform_reports_gce = ppatch.start()
@@ -89,6 +107,10 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertDictContainsSubset(HEADERS, req_header)
def test_metadata(self):
+ # UnicodeDecodeError if set to ds.userdata instead of userdata_raw
+ meta = GCE_META.copy()
+ meta['instance/attributes/user-data'] = b'/bin/echo \xff\n'
+
_set_mock_metadata()
self.ds.get_data()
@@ -117,8 +139,8 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
_set_mock_metadata(GCE_META_ENCODING)
self.ds.get_data()
- decoded = b64decode(
- GCE_META_ENCODING.get('instance/attributes/user-data'))
+ instance_data = GCE_META_ENCODING.get('instance/attributes')
+ decoded = b64decode(instance_data.get('user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
def test_missing_required_keys_return_false(self):
@@ -130,33 +152,124 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, self.ds.get_data())
httpretty.reset()
- def test_project_level_ssh_keys_are_used(self):
+ def test_no_ssh_keys_metadata(self):
_set_mock_metadata()
self.ds.get_data()
+ self.assertEqual([], self.ds.get_public_ssh_keys())
+
+ def test_cloudinit_ssh_keys(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
+
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ self.ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ @mock.patch("cloudinit.sources.DataSourceGCE.ug_util")
+ def test_default_user_ssh_keys(self, mock_ug_util):
+ mock_ug_util.normalize_users_groups.return_value = None, None
+ mock_ug_util.extract_default.return_value = 'ubuntu', None
+ ubuntu_ds = DataSourceGCE.DataSourceGCE(
+ settings.CFG_BUILTIN, self._make_distro('ubuntu'),
+ helpers.Paths({'run_dir': self.tmp_dir()}))
+
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ ubuntu_ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(ubuntu_ds.get_public_ssh_keys()))
+
+ def test_instance_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'block-project-ssh-keys': 'False',
+ }
- def test_instance_level_ssh_keys_are_used(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertIn(key_content, self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(key) for key in range(2)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ def test_block_project_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'block-project-ssh-keys': 'True',
+ }
- def test_instance_level_keys_replace_project_level_keys(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertEqual([key_content], self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(0)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
def test_only_last_part_of_zone_used_for_availability_zone(self):
_set_mock_metadata()
@@ -171,5 +284,44 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, ret)
m_fetcher.assert_not_called()
+ def test_has_expired(self):
+
+ def _get_timestamp(days):
+ format_str = '%Y-%m-%dT%H:%M:%S+0000'
+ today = datetime.datetime.now()
+ timestamp = today + datetime.timedelta(days=days)
+ return timestamp.strftime(format_str)
+
+ past = _get_timestamp(-1)
+ future = _get_timestamp(1)
+ ssh_keys = {
+ None: False,
+ '': False,
+ 'Invalid': False,
+ 'user:ssh-rsa key user@domain.com': False,
+ 'user:ssh-rsa key google {"expireOn":"%s"}' % past: False,
+ 'user:ssh-rsa key google-ssh': False,
+ 'user:ssh-rsa key google-ssh {invalid:json}': False,
+ 'user:ssh-rsa key google-ssh {"userName":"user"}': False,
+ 'user:ssh-rsa key google-ssh {"expireOn":"invalid"}': False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % future: False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % past: True,
+ }
+
+ for key, expired in ssh_keys.items():
+ self.assertEqual(DataSourceGCE._has_expired(key), expired)
+
+ def test_parse_public_keys_non_ascii(self):
+ public_key_data = [
+ 'cloudinit:rsa ssh-ke%s invalid' % chr(165),
+ 'use%sname:rsa ssh-key' % chr(174),
+ 'cloudinit:test 1',
+ 'default:test 2',
+ 'user:test 3',
+ ]
+ expected = ['test 1', 'test 2']
+ found = DataSourceGCE._parse_public_keys(
+ public_key_data, default_user='default')
+ self.assertEqual(sorted(found), sorted(expected))
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 289c6a40..6e4031cf 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,6 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
from copy import copy
+import mock
import os
import shutil
import tempfile
@@ -8,15 +9,10 @@ import yaml
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from cloudinit.tests.helpers import TestCase, populate_dir
+from cloudinit.tests.helpers import CiTestCase, populate_dir
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-class TestMAASDataSource(TestCase):
+class TestMAASDataSource(CiTestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
@@ -159,4 +155,47 @@ class TestMAASDataSource(TestCase):
self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
self.assertEqual(expected_vd, vd)
+
+@mock.patch("cloudinit.sources.DataSourceMAAS.url_helper.OauthUrlHelper")
+class TestGetOauthHelper(CiTestCase):
+ with_logs = True
+ base_cfg = {'consumer_key': 'FAKE_CONSUMER_KEY',
+ 'token_key': 'FAKE_TOKEN_KEY',
+ 'token_secret': 'FAKE_TOKEN_SECRET',
+ 'consumer_secret': None}
+
+ def test_all_required(self, m_helper):
+ """Valid config as expected."""
+ DataSourceMAAS.get_oauth_helper(self.base_cfg.copy())
+ m_helper.assert_has_calls([mock.call(**self.base_cfg)])
+
+ def test_other_fields_not_passed_through(self, m_helper):
+ """Only relevant fields are passed through."""
+ mycfg = self.base_cfg.copy()
+ mycfg['unrelated_field'] = 'unrelated'
+ DataSourceMAAS.get_oauth_helper(mycfg)
+ m_helper.assert_has_calls([mock.call(**self.base_cfg)])
+
+
+class TestGetIdHash(CiTestCase):
+ v1_cfg = {'consumer_key': 'CKEY', 'token_key': 'TKEY',
+ 'token_secret': 'TSEC'}
+ v1_id = (
+ 'v1:'
+ '403ee5f19c956507f1d0e50814119c405902137ea4f8838bde167c5da8110392')
+
+ def test_v1_expected(self):
+ """Test v1 id generated as expected working behavior from config."""
+ result = DataSourceMAAS.get_id_from_ds_cfg(self.v1_cfg.copy())
+ self.assertEqual(self.v1_id, result)
+
+ def test_v1_extra_fields_are_ignored(self):
+ """Test v1 id ignores unused entries in config."""
+ cfg = self.v1_cfg.copy()
+ cfg['consumer_secret'] = "BOO"
+ cfg['unrelated'] = "HI MOM"
+ result = DataSourceMAAS.get_id_from_ds_cfg(cfg)
+ self.assertEqual(self.v1_id, result)
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index fea9156b..70d50de4 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -3,22 +3,20 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from cloudinit.tests.helpers import TestCase, populate_dir, mock, ExitStack
+from cloudinit.tests.helpers import CiTestCase, populate_dir, mock, ExitStack
import os
-import shutil
-import tempfile
import textwrap
import yaml
-class TestNoCloudDataSource(TestCase):
+class TestNoCloudDataSource(CiTestCase):
def setUp(self):
super(TestNoCloudDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
@@ -215,7 +213,7 @@ class TestNoCloudDataSource(TestCase):
self.assertNotIn(gateway, str(dsrc.network_config))
-class TestParseCommandLineData(TestCase):
+class TestParseCommandLineData(CiTestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index e7d55692..5c3ba012 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -3,12 +3,11 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from cloudinit.tests.helpers import mock, populate_dir, TestCase
+from cloudinit.tests.helpers import mock, populate_dir, CiTestCase
+from textwrap import dedent
import os
import pwd
-import shutil
-import tempfile
import unittest
@@ -32,18 +31,20 @@ USER_DATA = '#cloud-config\napt_upgrade: true'
SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
HOSTNAME = 'foo.example.com'
PUBLIC_IP = '10.0.0.3'
+MACADDR = '02:00:0a:12:01:01'
+IP_BY_MACADDR = '10.18.1.1'
DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
-class TestOpenNebulaDataSource(TestCase):
+class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
# defaults for few tests
self.ds = ds.DataSourceOpenNebula
@@ -197,24 +198,96 @@ class TestOpenNebulaDataSource(TestCase):
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_hostname(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = {'02:00:0a:12:01:01': 'eth0'}
- for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: PUBLIC_IP})
- results = ds.read_context_disk_dir(my_d)
+ for dev in ('eth0', 'ens3'):
+ m_get_phys_by_mac.return_value = {MACADDR: dev}
+ for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: PUBLIC_IP})
+ results = ds.read_context_disk_dir(my_d)
- self.assertTrue('metadata' in results)
- self.assertTrue('local-hostname' in results['metadata'])
- self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
+ self.assertTrue('metadata' in results)
+ self.assertTrue('local-hostname' in results['metadata'])
+ self.assertEqual(
+ PUBLIC_IP, results['metadata']['local-hostname'])
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_network_interfaces(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = {'02:00:0a:12:01:01': 'eth0'}
- populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('network-interfaces' in results)
- self.assertTrue('1.2.3.4' in results['network-interfaces'])
+ for dev in ('eth0', 'ens3'):
+ m_get_phys_by_mac.return_value = {MACADDR: dev}
+
+ # without ETH0_MAC
+ # for Older OpenNebula?
+ populate_context_dir(self.seed_dir, {'ETH0_IP': IP_BY_MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_IP and ETH0_MAC
+ populate_context_dir(
+ self.seed_dir, {'ETH0_IP': IP_BY_MACADDR, 'ETH0_MAC': MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_IP with empty string and ETH0_MAC
+ # in the case of using Virtual Network contains
+ # "AR = [ TYPE = ETHER ]"
+ populate_context_dir(
+ self.seed_dir, {'ETH0_IP': '', 'ETH0_MAC': MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_NETWORK
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_NETWORK': '10.18.0.0'
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('10.18.0.0' in results['network-interfaces'])
+
+ # ETH0_NETWORK with empty string
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_NETWORK': ''
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('10.18.1.0' in results['network-interfaces'])
+
+ # ETH0_MASK
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_MASK': '255.255.0.0'
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('255.255.0.0' in results['network-interfaces'])
+
+ # ETH0_MASK with empty string
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_MASK': ''
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('255.255.255.0' in results['network-interfaces'])
def test_find_candidates(self):
def my_devs_with(criteria):
@@ -235,7 +308,7 @@ class TestOpenNebulaDataSource(TestCase):
class TestOpenNebulaNetwork(unittest.TestCase):
- system_nics = {'02:00:0a:12:01:01': 'eth0'}
+ system_nics = ('eth0', 'ens3')
def test_lo(self):
net = ds.OpenNebulaNetwork(context={}, system_nics_by_mac={})
@@ -246,45 +319,101 @@ iface lo inet loopback
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_eth0(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = self.system_nics
- net = ds.OpenNebulaNetwork({})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 10.18.1.1
- network 10.18.1.0
- netmask 255.255.255.0
-''')
+ for nic in self.system_nics:
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork({})
+ self.assertEqual(net.gen_conf(), dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto {dev}
+ iface {dev} inet static
+ #hwaddress {macaddr}
+ address 10.18.1.1
+ network 10.18.1.0
+ netmask 255.255.255.0
+ """.format(dev=nic, macaddr=MACADDR)))
def test_eth0_override(self):
context = {
'DNS': '1.2.3.8',
- 'ETH0_IP': '1.2.3.4',
- 'ETH0_NETWORK': '1.2.3.0',
+ 'ETH0_IP': '10.18.1.1',
+ 'ETH0_NETWORK': '10.18.0.0',
'ETH0_MASK': '255.255.0.0',
'ETH0_GATEWAY': '1.2.3.5',
'ETH0_DOMAIN': 'example.com',
- 'ETH0_DNS': '1.2.3.6 1.2.3.7'
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_MAC': '02:00:0a:12:01:01'
}
-
- net = ds.OpenNebulaNetwork(context,
- system_nics_by_mac=self.system_nics)
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 1.2.3.4
- network 1.2.3.0
- netmask 255.255.0.0
- gateway 1.2.3.5
- dns-search example.com
- dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
-''')
+ for nic in self.system_nics:
+ expected = dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto {dev}
+ iface {dev} inet static
+ #hwaddress {macaddr}
+ address 10.18.1.1
+ network 10.18.0.0
+ netmask 255.255.0.0
+ gateway 1.2.3.5
+ dns-search example.com
+ dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
+ """).format(dev=nic, macaddr=MACADDR)
+ net = ds.OpenNebulaNetwork(context,
+ system_nics_by_mac={MACADDR: nic})
+ self.assertEqual(expected, net.gen_conf())
+
+ def test_multiple_nics(self):
+ """Test rendering multiple nics with names that differ from context."""
+ MAC_1 = "02:00:0a:12:01:01"
+ MAC_2 = "02:00:0a:12:01:02"
+ context = {
+ 'DNS': '1.2.3.8',
+ 'ETH0_IP': '10.18.1.1',
+ 'ETH0_NETWORK': '10.18.0.0',
+ 'ETH0_MASK': '255.255.0.0',
+ 'ETH0_GATEWAY': '1.2.3.5',
+ 'ETH0_DOMAIN': 'example.com',
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_MAC': MAC_2,
+ 'ETH3_IP': '10.3.1.3',
+ 'ETH3_NETWORK': '10.3.0.0',
+ 'ETH3_MASK': '255.255.0.0',
+ 'ETH3_GATEWAY': '10.3.0.1',
+ 'ETH3_DOMAIN': 'third.example.com',
+ 'ETH3_DNS': '10.3.1.2',
+ 'ETH3_MAC': MAC_1,
+ }
+ net = ds.OpenNebulaNetwork(
+ context, system_nics_by_mac={MAC_1: 'enp0s25', MAC_2: 'enp1s2'})
+
+ expected = dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto enp0s25
+ iface enp0s25 inet static
+ #hwaddress 02:00:0a:12:01:01
+ address 10.3.1.3
+ network 10.3.0.0
+ netmask 255.255.0.0
+ gateway 10.3.0.1
+ dns-search third.example.com
+ dns-nameservers 1.2.3.8 10.3.1.2
+
+ auto enp1s2
+ iface enp1s2 inet static
+ #hwaddress 02:00:0a:12:01:02
+ address 10.18.1.1
+ network 10.18.0.0
+ netmask 255.255.0.0
+ gateway 1.2.3.5
+ dns-search example.com
+ dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
+ """)
+
+ self.assertEqual(expected, net.gen_conf())
class TestParseShellConfig(unittest.TestCase):
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index ed367e05..42c31554 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -131,6 +131,10 @@ def _read_metadata_service():
class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
VERSION = 'latest'
+ def setUp(self):
+ super(TestOpenStackDataSource, self).setUp()
+ self.tmp = self.tmp_dir()
+
@hp.activate
def test_successful(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
@@ -232,7 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertTrue(found)
@@ -256,7 +260,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertFalse(found)
@@ -271,7 +275,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
ds_os.ds_cfg = {
'max_wait': 0,
'timeout': 0,
@@ -294,7 +298,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
ds_os.ds_cfg = {
'max_wait': 0,
'timeout': 0,
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 700da86c..fc4eb36e 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -5,11 +5,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
import base64
-from collections import OrderedDict
+import os
-from cloudinit.tests import helpers as test_helpers
+from collections import OrderedDict
+from textwrap import dedent
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase, wrap_and_call
+from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceOVF as dsovf
+from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
+ CustomScriptNotFound)
OVF_ENV_CONTENT = """<?xml version="1.0" encoding="UTF-8"?>
<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
@@ -42,7 +48,7 @@ def fill_properties(props, template=OVF_ENV_CONTENT):
return template.format(properties=properties)
-class TestReadOvfEnv(test_helpers.TestCase):
+class TestReadOvfEnv(CiTestCase):
def test_with_b64_userdata(self):
user_data = "#!/bin/sh\necho hello world\n"
user_data_b64 = base64.b64encode(user_data.encode()).decode()
@@ -72,7 +78,104 @@ class TestReadOvfEnv(test_helpers.TestCase):
self.assertIsNone(ud)
-class TestTransportIso9660(test_helpers.CiTestCase):
+class TestMarkerFiles(CiTestCase):
+
+ def setUp(self):
+ super(TestMarkerFiles, self).setUp()
+ self.tdir = self.tmp_dir()
+
+ def test_false_when_markerid_none(self):
+ """Return False when markerid provided is None."""
+ self.assertFalse(
+ dsovf.check_marker_exists(markerid=None, marker_dir=self.tdir))
+
+ def test_markerid_file_exist(self):
+ """Return False when markerid file path does not exist,
+ True otherwise."""
+ self.assertFalse(
+ dsovf.check_marker_exists('123', self.tdir))
+
+ marker_file = self.tmp_path('.markerfile-123.txt', self.tdir)
+ util.write_file(marker_file, '')
+ self.assertTrue(
+ dsovf.check_marker_exists('123', self.tdir)
+ )
+
+ def test_marker_file_setup(self):
+ """Test creation of marker files."""
+ markerfilepath = self.tmp_path('.markerfile-hi.txt', self.tdir)
+ self.assertFalse(os.path.exists(markerfilepath))
+ dsovf.setup_marker_files(markerid='hi', marker_dir=self.tdir)
+ self.assertTrue(os.path.exists(markerfilepath))
+
+
+class TestDatasourceOVF(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestDatasourceOVF, self).setUp()
+ self.datasource = dsovf.DataSourceOVF
+ self.tdir = self.tmp_dir()
+
+ def test_get_data_false_on_none_dmi_data(self):
+ """When dmi for system-product-name is None, get_data returns False."""
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
+ retcode = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': None},
+ ds.get_data)
+ self.assertFalse(retcode, 'Expected False return from ds.get_data')
+ self.assertIn(
+ 'DEBUG: No system-product-name found', self.logs.getvalue())
+
+ def test_get_data_no_vmware_customization_disabled(self):
+ """When vmware customization is disabled via sys_cfg log a message."""
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+ retcode = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware'},
+ ds.get_data)
+ self.assertFalse(retcode, 'Expected False return from ds.get_data')
+ self.assertIn(
+ 'DEBUG: Customization for VMware platform is disabled.',
+ self.logs.getvalue())
+
+ def test_get_data_vmware_customization_disabled(self):
+ """When cloud-init workflow for vmware is enabled via sys_cfg log a
+ message.
+ """
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': False}, distro={},
+ paths=paths)
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345345
+ """)
+ util.write_file(conf_file, conf_content)
+ with self.assertRaises(CustomScriptNotFound) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+ customscript = self.tmp_path('test-script', self.tdir)
+ self.assertIn('Script %s not found!!' % customscript,
+ str(context.exception))
+
+
+class TestTransportIso9660(CiTestCase):
def setUp(self):
super(TestTransportIso9660, self).setUp()
diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py
index 436df9ee..8dec06b1 100644
--- a/tests/unittests/test_datasource/test_scaleway.py
+++ b/tests/unittests/test_datasource/test_scaleway.py
@@ -9,7 +9,7 @@ from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceScaleway
-from cloudinit.tests.helpers import mock, HttprettyTestCase, TestCase
+from cloudinit.tests.helpers import mock, HttprettyTestCase, CiTestCase
class DataResponses(object):
@@ -63,7 +63,11 @@ class MetadataResponses(object):
return 200, headers, json.dumps(cls.FAKE_METADATA)
-class TestOnScaleway(TestCase):
+class TestOnScaleway(CiTestCase):
+
+ def setUp(self):
+ super(TestOnScaleway, self).setUp()
+ self.tmp = self.tmp_dir()
def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline):
mock, faked = fake_dmi
@@ -91,7 +95,7 @@ class TestOnScaleway(TestCase):
# When not on Scaleway, get_data() returns False.
datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({})
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp})
)
self.assertFalse(datasource.get_data())
@@ -159,8 +163,9 @@ def get_source_address_adapter(*args, **kwargs):
class TestDataSourceScaleway(HttprettyTestCase):
def setUp(self):
+ tmp = self.tmp_dir()
self.datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({})
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': tmp})
)
super(TestDataSourceScaleway, self).setUp()
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 933d5b63..88bae5f9 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -359,7 +359,8 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
+ self.paths = c_helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
os.mkdir(self.legacy_user_d)
diff --git a/tests/unittests/test_distros/test_create_users.py b/tests/unittests/test_distros/test_create_users.py
index aa13670a..5670904a 100644
--- a/tests/unittests/test_distros/test_create_users.py
+++ b/tests/unittests/test_distros/test_create_users.py
@@ -7,7 +7,11 @@ from cloudinit.tests.helpers import (TestCase, mock)
class MyBaseDistro(distros.Distro):
# MyBaseDistro is here to test base Distro class implementations
- def __init__(self, name="basedistro", cfg={}, paths={}):
+ def __init__(self, name="basedistro", cfg=None, paths=None):
+ if not cfg:
+ cfg = {}
+ if not paths:
+ paths = {}
super(MyBaseDistro, self).__init__(name, cfg, paths)
def install_packages(self, pkglist):
@@ -42,7 +46,6 @@ class MyBaseDistro(distros.Distro):
@mock.patch("cloudinit.distros.util.subp")
class TestCreateUser(TestCase):
def setUp(self):
- super(TestCase, self).setUp()
self.dist = MyBaseDistro()
def _useradd2call(self, args):
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index c4bd11bc..1c2e45fe 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -2,6 +2,8 @@
import os
from six import StringIO
+import stat
+from textwrap import dedent
try:
from unittest import mock
@@ -12,13 +14,12 @@ try:
except ImportError:
from contextlib2 import ExitStack
-from cloudinit.tests.helpers import TestCase
-
from cloudinit import distros
from cloudinit.distros.parsers.sys_conf import SysConf
from cloudinit import helpers
from cloudinit.net import eni
from cloudinit import settings
+from cloudinit.tests.helpers import FilesystemMockingTestCase
from cloudinit import util
@@ -175,7 +176,7 @@ class WriteBuffer(object):
return self.buffer.getvalue()
-class TestNetCfgDistro(TestCase):
+class TestNetCfgDistro(FilesystemMockingTestCase):
frbsd_ifout = """\
hn0: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> metric 0 mtu 1500
@@ -188,9 +189,6 @@ hn0: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> metric 0 mtu 1500
status: active
"""
- def setUp(self):
- super(TestNetCfgDistro, self).setUp()
-
def _get_distro(self, dname, renderers=None):
cls = distros.fetch(dname)
cfg = settings.CFG_BUILTIN
@@ -774,4 +772,46 @@ ifconfig_vtnet0="DHCP"
self.assertCfgEquals(expected_buf, str(write_buf))
self.assertEqual(write_buf.mode, 0o644)
+ def test_simple_write_opensuse(self):
+ """Opensuse network rendering writes appropriate sysconfg files."""
+ tmpdir = self.tmp_dir()
+ self.patchOS(tmpdir)
+ self.patchUtils(tmpdir)
+ distro = self._get_distro('opensuse')
+
+ distro.apply_network(BASE_NET_CFG, False)
+
+ lo_path = os.path.join(tmpdir, 'etc/sysconfig/network/ifcfg-lo')
+ eth0_path = os.path.join(tmpdir, 'etc/sysconfig/network/ifcfg-eth0')
+ eth1_path = os.path.join(tmpdir, 'etc/sysconfig/network/ifcfg-eth1')
+ expected_cfgs = {
+ lo_path: dedent('''
+ STARTMODE="auto"
+ USERCONTROL="no"
+ FIREWALL="no"
+ '''),
+ eth0_path: dedent('''
+ BOOTPROTO="static"
+ BROADCAST="192.168.1.0"
+ GATEWAY="192.168.1.254"
+ IPADDR="192.168.1.5"
+ NETMASK="255.255.255.0"
+ STARTMODE="auto"
+ USERCONTROL="no"
+ ETHTOOL_OPTIONS=""
+ '''),
+ eth1_path: dedent('''
+ BOOTPROTO="dhcp"
+ STARTMODE="auto"
+ USERCONTROL="no"
+ ETHTOOL_OPTIONS=""
+ ''')
+ }
+ for cfgpath in (lo_path, eth0_path, eth1_path):
+ self.assertCfgEquals(
+ expected_cfgs[cfgpath],
+ util.load_file(cfgpath))
+ file_stat = os.stat(cfgpath)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index 1284e755..31cc6223 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -7,7 +7,7 @@ from uuid import uuid4
from cloudinit import safeyaml
from cloudinit import util
from cloudinit.tests.helpers import (
- CiTestCase, dir2dict, json_dumps, populate_dir)
+ CiTestCase, dir2dict, populate_dir)
UNAME_MYSYS = ("Linux bart 4.4.0-62-generic #83-Ubuntu "
"SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux")
@@ -27,11 +27,20 @@ TYPE=ext4
PARTUUID=30c65c77-e07d-4039-b2fb-88b1fb5fa1fc
"""
+# this is a Ubuntu 18.04 disk.img output (dual uefi and bios bootable)
+BLKID_UEFI_UBUNTU = [
+ {'DEVNAME': 'vda1', 'TYPE': 'ext4', 'PARTUUID': uuid4(), 'UUID': uuid4()},
+ {'DEVNAME': 'vda14', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'vda15', 'TYPE': 'vfat', 'LABEL': 'UEFI', 'PARTUUID': uuid4(),
+ 'UUID': '5F55-129B'}]
+
+
POLICY_FOUND_ONLY = "search,found=all,maybe=none,notfound=disabled"
POLICY_FOUND_OR_MAYBE = "search,found=all,maybe=all,notfound=disabled"
DI_DEFAULT_POLICY = "search,found=all,maybe=all,notfound=enabled"
DI_DEFAULT_POLICY_NO_DMI = "search,found=all,maybe=all,notfound=disabled"
DI_EC2_STRICT_ID_DEFAULT = "true"
+OVF_MATCH_STRING = 'http://schemas.dmtf.org/ovf/environment/1'
SHELL_MOCK_TMPL = """\
%(name)s() {
@@ -55,6 +64,7 @@ P_SEED_DIR = "var/lib/cloud/seed"
P_DSID_CFG = "etc/cloud/ds-identify.cfg"
MOCK_VIRT_IS_KVM = {'name': 'detect_virt', 'RET': 'kvm', 'ret': 0}
+MOCK_VIRT_IS_VMWARE = {'name': 'detect_virt', 'RET': 'vmware', 'ret': 0}
MOCK_UNAME_IS_PPC64 = {'name': 'uname', 'out': UNAME_PPC64EL, 'ret': 0}
@@ -222,6 +232,11 @@ class TestDsIdentify(CiTestCase):
self._test_ds_found('ConfigDrive')
return
+ def test_config_drive_upper(self):
+ """ConfigDrive datasource has a disk with LABEL=CONFIG-2."""
+ self._test_ds_found('ConfigDriveUpper')
+ return
+
def test_policy_disabled(self):
"""A Builtin policy of 'disabled' should return not found.
@@ -296,6 +311,54 @@ class TestDsIdentify(CiTestCase):
data, RC_FOUND, dslist=['OpenStack', 'None'])
self.assertIn("check for 'OpenStack' returned maybe", err)
+ def test_default_ovf_is_found(self):
+ """OVF is identified found when ovf/ovf-env.xml seed file exists."""
+ self._test_ds_found('OVF-seed')
+
+ def test_default_ovf_with_detect_virt_none_not_found(self):
+ """OVF identifies not found when detect_virt returns "none"."""
+ self._check_via_dict(
+ {'ds': 'OVF'}, rc=RC_NOT_FOUND, policy_dmi="disabled")
+
+ def test_default_ovf_returns_not_found_on_azure(self):
+ """OVF datasource won't be found as false positive on Azure."""
+ ovfonazure = copy.deepcopy(VALID_CFG['OVF'])
+ # Set azure asset tag to assert OVF content not found
+ ovfonazure['files'][P_CHASSIS_ASSET_TAG] = (
+ '7783-7084-3265-9085-8269-3286-77\n')
+ self._check_via_dict(
+ ovfonazure, RC_FOUND, dslist=['Azure', DS_NONE])
+
+ def test_ovf_on_vmware_iso_found_by_cdrom_with_ovf_schema_match(self):
+ """OVF is identified when iso9660 cdrom path contains ovf schema."""
+ self._test_ds_found('OVF')
+
+ def test_ovf_on_vmware_iso_found_when_vmware_customization(self):
+ """OVF is identified when vmware customization is enabled."""
+ self._test_ds_found('OVF-vmware-customization')
+
+ def test_ovf_on_vmware_iso_found_by_cdrom_with_matching_fs_label(self):
+ """OVF is identified by well-known iso9660 labels."""
+ ovf_cdrom_by_label = copy.deepcopy(VALID_CFG['OVF'])
+ # Unset matching cdrom ovf schema content
+ ovf_cdrom_by_label['files']['dev/sr0'] = 'No content match'
+ self._check_via_dict(
+ ovf_cdrom_by_label, rc=RC_NOT_FOUND, policy_dmi="disabled")
+
+ # Add recognized labels
+ valid_ovf_labels = ['ovf-transport', 'OVF-TRANSPORT',
+ "OVFENV", "ovfenv"]
+ for valid_ovf_label in valid_ovf_labels:
+ ovf_cdrom_by_label['mocks'][0]['out'] = blkid_out([
+ {'DEVNAME': 'sr0', 'TYPE': 'iso9660',
+ 'LABEL': valid_ovf_label}])
+ self._check_via_dict(
+ ovf_cdrom_by_label, rc=RC_FOUND, dslist=['OVF', DS_NONE])
+
+ def test_default_nocloud_as_vdb_iso9660(self):
+ """NoCloud is found with iso9660 filesystem on non-cdrom disk."""
+ self._test_ds_found('NoCloud')
+
def blkid_out(disks=None):
"""Convert a list of disk dictionaries into blkid content."""
@@ -305,7 +368,9 @@ def blkid_out(disks=None):
for disk in disks:
if not disk["DEVNAME"].startswith("/dev/"):
disk["DEVNAME"] = "/dev/" + disk["DEVNAME"]
- for key in disk:
+ # devname needs to be first.
+ lines.append("%s=%s" % ("DEVNAME", disk["DEVNAME"]))
+ for key in [d for d in disk if d != "DEVNAME"]:
lines.append("%s=%s" % (key, disk[key]))
lines.append("")
return '\n'.join(lines)
@@ -319,7 +384,7 @@ def _print_run_output(rc, out, err, cfg, files):
'-- rc = %s --' % rc,
'-- out --', str(out),
'-- err --', str(err),
- '-- cfg --', json_dumps(cfg)]))
+ '-- cfg --', util.json_dumps(cfg)]))
print('-- files --')
for k, v in files.items():
if "/_shwrap" in k:
@@ -376,6 +441,19 @@ VALID_CFG = {
'files': {P_PRODUCT_SERIAL: 'GoogleCloud-8f2e88f\n'},
'mocks': [MOCK_VIRT_IS_KVM],
},
+ 'NoCloud': {
+ 'ds': 'NoCloud',
+ 'mocks': [
+ MOCK_VIRT_IS_KVM,
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ BLKID_UEFI_UBUNTU +
+ [{'DEVNAME': 'vdb', 'TYPE': 'iso9660', 'LABEL': 'cidata'}])},
+ ],
+ 'files': {
+ 'dev/vdb': 'pretend iso content for cidata\n',
+ }
+ },
'OpenStack': {
'ds': 'OpenStack',
'files': {P_PRODUCT_NAME: 'OpenStack Nova\n'},
@@ -383,6 +461,43 @@ VALID_CFG = {
'policy_dmi': POLICY_FOUND_ONLY,
'policy_no_dmi': POLICY_FOUND_ONLY,
},
+ 'OVF-seed': {
+ 'ds': 'OVF',
+ 'files': {
+ os.path.join(P_SEED_DIR, 'ovf', 'ovf-env.xml'): 'present\n',
+ }
+ },
+ 'OVF-vmware-customization': {
+ 'ds': 'OVF',
+ 'mocks': [
+ # Include a mockes iso9660 potential, even though content not ovf
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ [{'DEVNAME': 'sr0', 'TYPE': 'iso9660', 'LABEL': ''}])
+ },
+ MOCK_VIRT_IS_VMWARE,
+ ],
+ 'files': {
+ 'dev/sr0': 'no match',
+ # Setup vmware customization enabled
+ 'usr/lib/vmware-tools/plugins/vmsvc/libdeployPkgPlugin.so': 'here',
+ 'etc/cloud/cloud.cfg': 'disable_vmware_customization: false\n',
+ }
+ },
+ 'OVF': {
+ 'ds': 'OVF',
+ 'mocks': [
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ [{'DEVNAME': 'vda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'sr0', 'TYPE': 'iso9660', 'LABEL': ''}])
+ },
+ MOCK_VIRT_IS_VMWARE,
+ ],
+ 'files': {
+ 'dev/sr0': 'pretend ovf iso has ' + OVF_MATCH_STRING + '\n',
+ }
+ },
'ConfigDrive': {
'ds': 'ConfigDrive',
'mocks': [
@@ -395,6 +510,18 @@ VALID_CFG = {
},
],
},
+ 'ConfigDriveUpper': {
+ 'ds': 'ConfigDrive',
+ 'mocks': [
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ [{'DEVNAME': 'vda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'vda2', 'TYPE': 'ext4',
+ 'LABEL': 'cloudimg-rootfs', 'PARTUUID': uuid4()},
+ {'DEVNAME': 'vdb', 'TYPE': 'vfat', 'LABEL': 'CONFIG-2'}])
+ },
+ ],
+ },
}
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
index e0d9ab6c..a2054980 100644
--- a/tests/unittests/test_handler/test_handler_lxd.py
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -25,9 +25,6 @@ class TestLxd(t_help.CiTestCase):
}
}
- def setUp(self):
- super(TestLxd, self).setUp()
-
def _get_cloud(self, distro):
cls = distros.fetch(distro)
paths = helpers.Paths({})
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 85a0fe0a..3c726422 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -9,9 +9,6 @@ from cloudinit.tests.helpers import mock
class TestLoadPowerState(t_help.TestCase):
- def setUp(self):
- super(self.__class__, self).setUp()
-
def test_no_config(self):
# completely empty config should mean do nothing
(cmd, _timeout, _condition) = psc.load_power_state({})
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index 29d5574d..5aa3c498 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -1,7 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.config.cc_resizefs import (
- can_skip_resize, handle, maybe_get_writable_device_path)
+ can_skip_resize, handle, maybe_get_writable_device_path, _resize_btrfs)
from collections import namedtuple
import logging
@@ -293,5 +293,25 @@ class TestMaybeGetDevicePathAsWritableBlock(CiTestCase):
" per kernel cmdline",
self.logs.getvalue())
+ @mock.patch('cloudinit.util.mount_is_read_write')
+ @mock.patch('cloudinit.config.cc_resizefs.os.path.isdir')
+ def test_resize_btrfs_mount_is_ro(self, m_is_dir, m_is_rw):
+ """Do not resize / directly if it is read-only. (LP: #1734787)."""
+ m_is_rw.return_value = False
+ m_is_dir.return_value = True
+ self.assertEqual(
+ ('btrfs', 'filesystem', 'resize', 'max', '//.snapshots'),
+ _resize_btrfs("/", "/dev/sda1"))
+
+ @mock.patch('cloudinit.util.mount_is_read_write')
+ @mock.patch('cloudinit.config.cc_resizefs.os.path.isdir')
+ def test_resize_btrfs_mount_is_rw(self, m_is_dir, m_is_rw):
+ """Do not resize / directly if it is read-only. (LP: #1734787)."""
+ m_is_rw.return_value = True
+ m_is_dir.return_value = True
+ self.assertEqual(
+ ('btrfs', 'filesystem', 'resize', 'max', '/'),
+ _resize_btrfs("/", "/dev/sda1"))
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index b7adbe50..b90a3af3 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -5,10 +5,6 @@ from cloudinit import util
from cloudinit.tests import helpers
-try:
- from configparser import ConfigParser
-except ImportError:
- from ConfigParser import ConfigParser
import logging
import shutil
from six import StringIO
@@ -58,8 +54,7 @@ class TestConfig(helpers.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
- parser = ConfigParser()
- parser.readfp(StringIO(contents))
+ parser = self.parse_and_read(StringIO(contents))
expected = {
'epel_testing': {
'name': 'Extra Packages for Enterprise Linux 5 - Testing',
@@ -95,8 +90,7 @@ class TestConfig(helpers.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
contents = util.load_file("/etc/yum.repos.d/puppetlabs_products.repo")
- parser = ConfigParser()
- parser.readfp(StringIO(contents))
+ parser = self.parse_and_read(StringIO(contents))
expected = {
'puppetlabs_products': {
'name': 'Puppet Labs Products El 6 - $basearch',
diff --git a/tests/unittests/test_handler/test_handler_zypper_add_repo.py b/tests/unittests/test_handler/test_handler_zypper_add_repo.py
index 315c2a5e..72ab6c08 100644
--- a/tests/unittests/test_handler/test_handler_zypper_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_zypper_add_repo.py
@@ -9,10 +9,6 @@ from cloudinit import util
from cloudinit.tests import helpers
from cloudinit.tests.helpers import mock
-try:
- from configparser import ConfigParser
-except ImportError:
- from ConfigParser import ConfigParser
import logging
from six import StringIO
@@ -70,8 +66,7 @@ class TestConfig(helpers.FilesystemMockingTestCase):
root_d = self.tmp_dir()
cc_zypper_add_repo._write_repos(cfg['repos'], root_d)
contents = util.load_file("%s/testing-foo.repo" % root_d)
- parser = ConfigParser()
- parser.readfp(StringIO(contents))
+ parser = self.parse_and_read(StringIO(contents))
expected = {
'testing-foo': {
'name': 'test-foo',
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index f3fa2a30..ac33e8ef 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1,9 +1,9 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit import net
-from cloudinit.net import _natural_sort_key
from cloudinit.net import cmdline
from cloudinit.net import eni
+from cloudinit.net import natural_sort_key
from cloudinit.net import netplan
from cloudinit.net import network_state
from cloudinit.net import renderers
@@ -2708,11 +2708,11 @@ class TestInterfacesSorting(CiTestCase):
def test_natural_order(self):
data = ['ens5', 'ens6', 'ens3', 'ens20', 'ens13', 'ens2']
self.assertEqual(
- sorted(data, key=_natural_sort_key),
+ sorted(data, key=natural_sort_key),
['ens2', 'ens3', 'ens5', 'ens6', 'ens13', 'ens20'])
data2 = ['enp2s0', 'enp2s3', 'enp0s3', 'enp0s13', 'enp0s8', 'enp1s2']
self.assertEqual(
- sorted(data2, key=_natural_sort_key),
+ sorted(data2, key=natural_sort_key),
['enp0s3', 'enp0s8', 'enp0s13', 'enp1s2', 'enp2s0', 'enp2s3'])
@@ -2948,4 +2948,16 @@ class TestRenameInterfaces(CiTestCase):
mock_subp.assert_has_calls(expected)
+class TestNetworkState(CiTestCase):
+
+ def test_bcast_addr(self):
+ """Test mask_and_ipv4_to_bcast_addr proper execution."""
+ bcast_addr = network_state.mask_and_ipv4_to_bcast_addr
+ self.assertEqual("192.168.1.255",
+ bcast_addr("255.255.255.0", "192.168.1.1"))
+ self.assertEqual("128.42.7.255",
+ bcast_addr("255.255.248.0", "128.42.5.4"))
+ self.assertEqual("10.1.21.255",
+ bcast_addr("255.255.255.0", "10.1.21.4"))
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py
index 571420ed..e15ba6cf 100644
--- a/tests/unittests/test_reporting.py
+++ b/tests/unittests/test_reporting.py
@@ -126,7 +126,7 @@ class TestBaseReportingHandler(TestCase):
def test_base_reporting_handler_is_abstract(self):
regexp = r".*abstract.*publish_event.*"
- self.assertRaisesRegexp(TypeError, regexp, handlers.ReportingHandler)
+ self.assertRaisesRegex(TypeError, regexp, handlers.ReportingHandler)
class TestLogHandler(TestCase):
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
index add93653..5d3f1ca3 100644
--- a/tests/unittests/test_runs/test_merge_run.py
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -23,6 +23,7 @@ class TestMergeRun(helpers.FilesystemMockingTestCase):
cfg = {
'datasource_list': ['None'],
'cloud_init_modules': ['write-files'],
+ 'system_info': {'paths': {'run_dir': new_root}}
}
ud = self.readResource('user_data.1.txt')
cloud_cfg = util.yaml_dumps(cfg)
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
index b8fb4794..762974e9 100644
--- a/tests/unittests/test_runs/test_simple_run.py
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -2,10 +2,10 @@
import os
-from cloudinit.tests import helpers
from cloudinit.settings import PER_INSTANCE
from cloudinit import stages
+from cloudinit.tests import helpers
from cloudinit import util
@@ -23,6 +23,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):
'datasource_list': ['None'],
'runcmd': ['ls /etc'], # test ALL_DISTROS
'spacewalk': {}, # test non-ubuntu distros module definition
+ 'system_info': {'paths': {'run_dir': self.new_root}},
'write_files': [
{
'path': '/etc/blah.ini',
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index b911d929..53154d33 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -14,7 +14,7 @@ from cloudinit import templater
try:
import Cheetah
HAS_CHEETAH = True
- Cheetah # make pyflakes happy, as Cheetah is not used here
+ c = Cheetah # make pyflakes and pylint happy, as Cheetah is not used here
except ImportError:
HAS_CHEETAH = False
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 3e4154ca..4a92e741 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -299,6 +299,14 @@ class TestLoadYaml(helpers.TestCase):
default=self.mydefault),
myobj)
+ def test_none_returns_default(self):
+ """If yaml.load returns None, then default should be returned."""
+ blobs = ("", " ", "# foo\n", "#")
+ mdef = self.mydefault
+ self.assertEqual(
+ [(b, self.mydefault) for b in blobs],
+ [(b, util.load_yaml(blob=b, default=mdef)) for b in blobs])
+
class TestMountinfoParsing(helpers.ResourceUsingTestCase):
def test_invalid_mountinfo(self):
@@ -477,6 +485,44 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
self.assertIsNone(util.read_dmi_data("system-product-name"))
+class TestGetConfigLogfiles(helpers.CiTestCase):
+
+ def test_empty_cfg_returns_empty_list(self):
+ """An empty config passed to get_config_logfiles returns empty list."""
+ self.assertEqual([], util.get_config_logfiles(None))
+ self.assertEqual([], util.get_config_logfiles({}))
+
+ def test_default_log_file_present(self):
+ """When default_log_file is set get_config_logfiles finds it."""
+ self.assertEqual(
+ ['/my.log'],
+ util.get_config_logfiles({'def_log_file': '/my.log'}))
+
+ def test_output_logs_parsed_when_teeing_files(self):
+ """When output configuration is parsed when teeing files."""
+ self.assertEqual(
+ ['/himom.log', '/my.log'],
+ sorted(util.get_config_logfiles({
+ 'def_log_file': '/my.log',
+ 'output': {'all': '|tee -a /himom.log'}})))
+
+ def test_output_logs_parsed_when_redirecting(self):
+ """When output configuration is parsed when redirecting to a file."""
+ self.assertEqual(
+ ['/my.log', '/test.log'],
+ sorted(util.get_config_logfiles({
+ 'def_log_file': '/my.log',
+ 'output': {'all': '>/test.log'}})))
+
+ def test_output_logs_parsed_when_appending(self):
+ """When output configuration is parsed when appending to a file."""
+ self.assertEqual(
+ ['/my.log', '/test.log'],
+ sorted(util.get_config_logfiles({
+ 'def_log_file': '/my.log',
+ 'output': {'all': '>> /test.log'}})))
+
+
class TestMultiLog(helpers.FilesystemMockingTestCase):
def _createConsole(self, root):
@@ -577,6 +623,7 @@ class TestSubp(helpers.CiTestCase):
utf8_valid = b'start \xc3\xa9 end'
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
+ bogus_command = 'this-is-not-expected-to-be-a-program-name'
def printf_cmd(self, *args):
# bash's printf supports \xaa. So does /usr/bin/printf
@@ -657,15 +704,29 @@ class TestSubp(helpers.CiTestCase):
util.write_file(noshebang, 'true\n')
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- self.assertRaisesRegexp(util.ProcessExecutionError,
- 'Missing #! in script\?',
- util.subp, (noshebang,))
+ self.assertRaisesRegex(util.ProcessExecutionError,
+ 'Missing #! in script\?',
+ util.subp, (noshebang,))
def test_returns_none_if_no_capture(self):
(out, err) = util.subp(self.stdin2out, data=b'', capture=False)
self.assertIsNone(err)
self.assertIsNone(out)
+ def test_exception_has_out_err_are_bytes_if_decode_false(self):
+ """Raised exc should have stderr, stdout as bytes if no decode."""
+ with self.assertRaises(util.ProcessExecutionError) as cm:
+ util.subp([self.bogus_command], decode=False)
+ self.assertTrue(isinstance(cm.exception.stdout, bytes))
+ self.assertTrue(isinstance(cm.exception.stderr, bytes))
+
+ def test_exception_has_out_err_are_bytes_if_decode_true(self):
+ """Raised exc should have stderr, stdout as string if no decode."""
+ with self.assertRaises(util.ProcessExecutionError) as cm:
+ util.subp([self.bogus_command], decode=True)
+ self.assertTrue(isinstance(cm.exception.stdout, six.string_types))
+ self.assertTrue(isinstance(cm.exception.stderr, six.string_types))
+
def test_bunch_of_slashes_in_path(self):
self.assertEqual("/target/my/path/",
util.target_path("/target/", "//my/path/"))
diff --git a/tests/unittests/test_vmware/__init__.py b/tests/unittests/test_vmware/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_vmware/__init__.py
diff --git a/tests/unittests/test_vmware/test_custom_script.py b/tests/unittests/test_vmware/test_custom_script.py
new file mode 100644
index 00000000..2d9519b0
--- /dev/null
+++ b/tests/unittests/test_vmware/test_custom_script.py
@@ -0,0 +1,99 @@
+# Copyright (C) 2015 Canonical Ltd.
+# Copyright (C) 2017 VMware INC.
+#
+# Author: Maitreyee Saikia <msaikia@vmware.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import util
+from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
+ CustomScriptConstant,
+ CustomScriptNotFound,
+ PreCustomScript,
+ PostCustomScript,
+)
+from cloudinit.tests.helpers import CiTestCase, mock
+
+
+class TestVmwareCustomScript(CiTestCase):
+ def setUp(self):
+ self.tmpDir = self.tmp_dir()
+
+ def test_prepare_custom_script(self):
+ """
+ This test is designed to verify the behavior based on the presence of
+ custom script. Mainly needed for scenario where a custom script is
+ expected, but was not properly copied. "CustomScriptNotFound" exception
+ is raised in such cases.
+ """
+ # Custom script does not exist.
+ preCust = PreCustomScript("random-vmw-test", self.tmpDir)
+ self.assertEqual("random-vmw-test", preCust.scriptname)
+ self.assertEqual(self.tmpDir, preCust.directory)
+ self.assertEqual(self.tmp_path("random-vmw-test", self.tmpDir),
+ preCust.scriptpath)
+ with self.assertRaises(CustomScriptNotFound):
+ preCust.prepare_script()
+
+ # Custom script exists.
+ custScript = self.tmp_path("test-cust", self.tmpDir)
+ util.write_file(custScript, "test-CR-strip/r/r")
+ postCust = PostCustomScript("test-cust", self.tmpDir)
+ self.assertEqual("test-cust", postCust.scriptname)
+ self.assertEqual(self.tmpDir, postCust.directory)
+ self.assertEqual(custScript, postCust.scriptpath)
+ self.assertFalse(postCust.postreboot)
+ postCust.prepare_script()
+ # Check if all carraige returns are stripped from script.
+ self.assertFalse("/r" in custScript)
+
+ def test_rc_local_exists(self):
+ """
+ This test is designed to verify the different scenarios associated
+ with the presence of rclocal.
+ """
+ # test when rc local does not exist
+ postCust = PostCustomScript("test-cust", self.tmpDir)
+ with mock.patch.object(CustomScriptConstant, "RC_LOCAL", "/no/path"):
+ rclocal = postCust.find_rc_local()
+ self.assertEqual("", rclocal)
+
+ # test when rc local exists
+ rclocalFile = self.tmp_path("vmware-rclocal", self.tmpDir)
+ util.write_file(rclocalFile, "# Run post-reboot guest customization",
+ omode="w")
+ with mock.patch.object(CustomScriptConstant, "RC_LOCAL", rclocalFile):
+ rclocal = postCust.find_rc_local()
+ self.assertEqual(rclocalFile, rclocal)
+ self.assertTrue(postCust.has_previous_agent, rclocal)
+
+ # test when rc local is a symlink
+ rclocalLink = self.tmp_path("dummy-rclocal-link", self.tmpDir)
+ util.sym_link(rclocalFile, rclocalLink, True)
+ with mock.patch.object(CustomScriptConstant, "RC_LOCAL", rclocalLink):
+ rclocal = postCust.find_rc_local()
+ self.assertEqual(rclocalFile, rclocal)
+
+ def test_execute_post_cust(self):
+ """
+ This test is to identify if rclocal was properly populated to be
+ run after reboot.
+ """
+ customscript = self.tmp_path("vmware-post-cust-script", self.tmpDir)
+ rclocal = self.tmp_path("vmware-rclocal", self.tmpDir)
+ # Create a temporary rclocal file
+ open(customscript, "w")
+ util.write_file(rclocal, "tests\nexit 0", omode="w")
+ postCust = PostCustomScript("vmware-post-cust-script", self.tmpDir)
+ with mock.patch.object(CustomScriptConstant, "RC_LOCAL", rclocal):
+ # Test that guest customization agent is not installed initially.
+ self.assertFalse(postCust.postreboot)
+ self.assertIs(postCust.has_previous_agent(rclocal), False)
+ postCust.install_agent()
+
+ # Assert rclocal has been modified to have guest customization
+ # agent.
+ self.assertTrue(postCust.postreboot)
+ self.assertTrue(postCust.has_previous_agent, rclocal)
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py
index 808d303a..036f6879 100644
--- a/tests/unittests/test_vmware_config_file.py
+++ b/tests/unittests/test_vmware_config_file.py
@@ -133,7 +133,8 @@ class TestVmwareConfigFile(CiTestCase):
conf = Config(cf)
with self.assertRaises(ValueError):
- conf.reset_password()
+ pw = conf.reset_password
+ self.assertIsNone(pw)
cf.clear()
cf._insertKey("PASSWORD|RESET", "yes")
@@ -334,5 +335,12 @@ class TestVmwareConfigFile(CiTestCase):
self.assertEqual('255.255.0.0', subnet.get('netmask'),
'Subnet netmask')
+ def test_custom_script(self):
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+ conf = Config(cf)
+ self.assertIsNone(conf.custom_script_name)
+ cf._insertKey("CUSTOM-SCRIPT|SCRIPT-NAME", "test-script")
+ conf = Config(cf)
+ self.assertEqual("test-script", conf.custom_script_name)
# vi: ts=4 expandtab