summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2015-02-10 20:53:20 -0500
committerScott Moser <smoser@ubuntu.com>2015-02-10 20:53:20 -0500
commitcd632b2f153a61faa48531cb41d0288650e72c71 (patch)
treea3a35b9ca437d999a15fdd25c0a0de88e3cafafb
parent78915c97c18d678db10e0fde0d9306823c5f4610 (diff)
parentf5f280cae778bd214b91664f28d9eed997fbcda5 (diff)
downloadvyos-cloud-init-cd632b2f153a61faa48531cb41d0288650e72c71.tar.gz
vyos-cloud-init-cd632b2f153a61faa48531cb41d0288650e72c71.zip
python3 support.
This gives us functional python3 support. There are likely still bugs, but instance boot on openstack is functional now. LP: #1247132
-rw-r--r--.bzrignore4
-rw-r--r--ChangeLog1
-rw-r--r--MANIFEST.in8
-rwxr-xr-xbin/cloud-init2
-rw-r--r--cloudinit/config/cc_apt_configure.py2
-rw-r--r--cloudinit/config/cc_bootcmd.py2
-rw-r--r--cloudinit/config/cc_ca_certs.py4
-rw-r--r--cloudinit/config/cc_chef.py6
-rw-r--r--cloudinit/config/cc_debug.py7
-rw-r--r--cloudinit/config/cc_landscape.py2
-rw-r--r--cloudinit/config/cc_mcollective.py15
-rw-r--r--cloudinit/config/cc_phone_home.py4
-rw-r--r--cloudinit/config/cc_puppet.py10
-rw-r--r--cloudinit/config/cc_resolv_conf.py4
-rw-r--r--cloudinit/config/cc_rightscale_userdata.py4
-rw-r--r--cloudinit/config/cc_runcmd.py2
-rw-r--r--cloudinit/config/cc_salt_minion.py2
-rw-r--r--cloudinit/config/cc_seed_random.py15
-rw-r--r--cloudinit/config/cc_set_passwords.py6
-rw-r--r--cloudinit/config/cc_ssh.py16
-rw-r--r--cloudinit/config/cc_ssh_authkey_fingerprints.py2
-rw-r--r--cloudinit/config/cc_write_files.py5
-rw-r--r--cloudinit/config/cc_yum_add_repo.py7
-rw-r--r--cloudinit/distros/__init__.py69
-rw-r--r--cloudinit/distros/arch.py4
-rw-r--r--cloudinit/distros/debian.py2
-rw-r--r--cloudinit/distros/freebsd.py12
-rw-r--r--cloudinit/distros/gentoo.py2
-rw-r--r--cloudinit/distros/net_util.py2
-rw-r--r--cloudinit/distros/parsers/hostname.py2
-rw-r--r--cloudinit/distros/parsers/hosts.py2
-rw-r--r--cloudinit/distros/parsers/resolv_conf.py2
-rw-r--r--cloudinit/distros/parsers/sys_conf.py5
-rw-r--r--cloudinit/distros/rhel.py2
-rw-r--r--cloudinit/distros/rhel_util.py4
-rw-r--r--cloudinit/distros/sles.py4
-rw-r--r--cloudinit/ec2_utils.py9
-rw-r--r--cloudinit/handlers/__init__.py14
-rw-r--r--cloudinit/handlers/boot_hook.py2
-rw-r--r--cloudinit/handlers/cloud_config.py2
-rw-r--r--cloudinit/handlers/shell_script.py2
-rw-r--r--cloudinit/handlers/upstart_job.py2
-rw-r--r--cloudinit/helpers.py12
-rw-r--r--cloudinit/log.py7
-rw-r--r--cloudinit/mergers/__init__.py4
-rw-r--r--cloudinit/mergers/m_dict.py4
-rw-r--r--cloudinit/mergers/m_list.py6
-rw-r--r--cloudinit/mergers/m_str.py10
-rw-r--r--cloudinit/netinfo.py4
-rw-r--r--cloudinit/signal_handler.py2
-rw-r--r--cloudinit/sources/DataSourceAltCloud.py8
-rw-r--r--cloudinit/sources/DataSourceAzure.py7
-rw-r--r--cloudinit/sources/DataSourceConfigDrive.py4
-rw-r--r--cloudinit/sources/DataSourceDigitalOcean.py9
-rw-r--r--cloudinit/sources/DataSourceEc2.py4
-rw-r--r--cloudinit/sources/DataSourceMAAS.py48
-rw-r--r--cloudinit/sources/DataSourceOVF.py6
-rw-r--r--cloudinit/sources/DataSourceOpenNebula.py12
-rw-r--r--cloudinit/sources/DataSourceSmartOS.py21
-rw-r--r--cloudinit/sources/__init__.py10
-rw-r--r--cloudinit/sources/helpers/openstack.py10
-rw-r--r--cloudinit/ssh_util.py6
-rw-r--r--cloudinit/stages.py23
-rw-r--r--cloudinit/templater.py2
-rw-r--r--cloudinit/type_utils.py32
-rw-r--r--cloudinit/url_helper.py22
-rw-r--r--cloudinit/user_data.py10
-rw-r--r--cloudinit/util.py204
-rwxr-xr-xpackages/bddeb101
-rwxr-xr-xpackages/brpm2
-rw-r--r--packages/debian/changelog.in2
-rw-r--r--packages/debian/control.in29
-rw-r--r--packages/debian/copyright2
-rwxr-xr-xpackages/debian/rules.in (renamed from packages/debian/rules)8
-rw-r--r--requirements.txt6
-rwxr-xr-xsetup.py14
-rw-r--r--templates/resolv.conf.tmpl2
-rw-r--r--test-requirements.txt3
-rw-r--r--tests/unittests/helpers.py121
-rw-r--r--tests/unittests/test__init__.py238
-rw-r--r--tests/unittests/test_builtin_handlers.py39
-rw-r--r--tests/unittests/test_cs_util.py39
-rw-r--r--tests/unittests/test_data.py144
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py6
-rw-r--r--tests/unittests/test_datasource/test_azure.py96
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py1
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py107
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py7
-rw-r--r--tests/unittests/test_datasource/test_gce.py4
-rw-r--r--tests/unittests/test_datasource/test_maas.py76
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py57
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py23
-rw-r--r--tests/unittests/test_datasource/test_openstack.py7
-rw-r--r--tests/unittests/test_datasource/test_smartos.py24
-rw-r--r--tests/unittests/test_distros/test_generic.py6
-rw-r--r--tests/unittests/test_distros/test_hostname.py4
-rw-r--r--tests/unittests/test_distros/test_hosts.py4
-rw-r--r--tests/unittests/test_distros/test_netconfig.py273
-rw-r--r--tests/unittests/test_distros/test_resolv.py5
-rw-r--r--tests/unittests/test_distros/test_sysconfig.py5
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py7
-rw-r--r--tests/unittests/test_filters/test_launch_index.py8
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py20
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py254
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py18
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py5
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py110
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py11
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py15
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py11
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py11
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py12
-rw-r--r--tests/unittests/test_merging.py16
-rw-r--r--tests/unittests/test_pathprefix2dict.py12
-rw-r--r--tests/unittests/test_runs/test_merge_run.py8
-rw-r--r--tests/unittests/test_runs/test_simple_run.py11
-rw-r--r--tests/unittests/test_templating.py13
-rw-r--r--tests/unittests/test_util.py73
-rwxr-xr-xtools/ccfg-merge-debug4
-rwxr-xr-xtools/read-dependencies10
-rw-r--r--tox.ini23
121 files changed, 1624 insertions, 1243 deletions
diff --git a/.bzrignore b/.bzrignore
new file mode 100644
index 00000000..926e4581
--- /dev/null
+++ b/.bzrignore
@@ -0,0 +1,4 @@
+.tox
+dist
+cloud_init.egg-info
+__pycache__
diff --git a/ChangeLog b/ChangeLog
index abcadb4d..bafeea3d 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -21,6 +21,7 @@
- hostname: on first boot apply hostname to be same as is written for
persistent hostname. (LP: #1246485)
- remove usage of dmidecode on linux in favor of /sys interface [Ben Howard]
+ - python3 support [Barry Warsaw] (LP: #1247132)
0.7.6:
- open 0.7.6
- Enable vendordata on CloudSigma datasource (LP: #1303986)
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 00000000..90f6c7d5
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,8 @@
+include *.py MANIFEST.in ChangeLog
+global-include *.txt *.rst *.ini *.in *.conf *.cfg *.sh
+graft tools
+prune build
+prune dist
+prune .tox
+prune .bzr
+exclude .bzrignore
diff --git a/bin/cloud-init b/bin/cloud-init
index 866f8ca4..6c83c2e7 100755
--- a/bin/cloud-init
+++ b/bin/cloud-init
@@ -428,7 +428,7 @@ def atomic_write_json(path, data):
try:
tf = tempfile.NamedTemporaryFile(dir=os.path.dirname(path),
delete=False)
- tf.write(json.dumps(data, indent=1) + "\n")
+ tf.write(util.encode_text(json.dumps(data, indent=1) + "\n"))
tf.close()
os.rename(tf.name, path)
except Exception as e:
diff --git a/cloudinit/config/cc_apt_configure.py b/cloudinit/config/cc_apt_configure.py
index f10b76a3..de72903f 100644
--- a/cloudinit/config/cc_apt_configure.py
+++ b/cloudinit/config/cc_apt_configure.py
@@ -126,7 +126,7 @@ def mirror2lists_fileprefix(mirror):
def rename_apt_lists(old_mirrors, new_mirrors, lists_d="/var/lib/apt/lists"):
- for (name, omirror) in old_mirrors.iteritems():
+ for (name, omirror) in old_mirrors.items():
nmirror = new_mirrors.get(name)
if not nmirror:
continue
diff --git a/cloudinit/config/cc_bootcmd.py b/cloudinit/config/cc_bootcmd.py
index 3ac22967..a295cc4e 100644
--- a/cloudinit/config/cc_bootcmd.py
+++ b/cloudinit/config/cc_bootcmd.py
@@ -36,7 +36,7 @@ def handle(name, cfg, cloud, log, _args):
with util.ExtendedTemporaryFile(suffix=".sh") as tmpf:
try:
content = util.shellify(cfg["bootcmd"])
- tmpf.write(content)
+ tmpf.write(util.encode_text(content))
tmpf.flush()
except:
util.logexc(log, "Failed to shellify bootcmd")
diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py
index 4f2a46a1..8248b020 100644
--- a/cloudinit/config/cc_ca_certs.py
+++ b/cloudinit/config/cc_ca_certs.py
@@ -44,7 +44,7 @@ def add_ca_certs(certs):
if certs:
# First ensure they are strings...
cert_file_contents = "\n".join([str(c) for c in certs])
- util.write_file(CA_CERT_FULL_PATH, cert_file_contents, mode=0644)
+ util.write_file(CA_CERT_FULL_PATH, cert_file_contents, mode=0o644)
# Append cert filename to CA_CERT_CONFIG file.
# We have to strip the content because blank lines in the file
@@ -63,7 +63,7 @@ def remove_default_ca_certs():
"""
util.delete_dir_contents(CA_CERT_PATH)
util.delete_dir_contents(CA_CERT_SYSTEM_PATH)
- util.write_file(CA_CERT_CONFIG, "", mode=0644)
+ util.write_file(CA_CERT_CONFIG, "", mode=0o644)
debconf_sel = "ca-certificates ca-certificates/trust_new_crts select no"
util.subp(('debconf-set-selections', '-'), debconf_sel)
diff --git a/cloudinit/config/cc_chef.py b/cloudinit/config/cc_chef.py
index fc837363..e18c5405 100644
--- a/cloudinit/config/cc_chef.py
+++ b/cloudinit/config/cc_chef.py
@@ -76,6 +76,8 @@ from cloudinit import templater
from cloudinit import url_helper
from cloudinit import util
+import six
+
RUBY_VERSION_DEFAULT = "1.8"
CHEF_DIRS = tuple([
@@ -261,7 +263,7 @@ def run_chef(chef_cfg, log):
cmd_args = chef_cfg['exec_arguments']
if isinstance(cmd_args, (list, tuple)):
cmd.extend(cmd_args)
- elif isinstance(cmd_args, (str, basestring)):
+ elif isinstance(cmd_args, six.string_types):
cmd.append(cmd_args)
else:
log.warn("Unknown type %s provided for chef"
@@ -300,7 +302,7 @@ def install_chef(cloud, chef_cfg, log):
with util.tempdir() as tmpd:
# Use tmpdir over tmpfile to avoid 'text file busy' on execute
tmpf = "%s/chef-omnibus-install" % tmpd
- util.write_file(tmpf, str(content), mode=0700)
+ util.write_file(tmpf, content, mode=0o700)
util.subp([tmpf], capture=False)
else:
log.warn("Unknown chef install type '%s'", install_type)
diff --git a/cloudinit/config/cc_debug.py b/cloudinit/config/cc_debug.py
index 8c489426..bdc32fe6 100644
--- a/cloudinit/config/cc_debug.py
+++ b/cloudinit/config/cc_debug.py
@@ -34,7 +34,8 @@ It can be configured with the following option structure::
"""
import copy
-from StringIO import StringIO
+
+from six import StringIO
from cloudinit import type_utils
from cloudinit import util
@@ -77,7 +78,7 @@ def handle(name, cfg, cloud, log, args):
dump_cfg = copy.deepcopy(cfg)
for k in SKIP_KEYS:
dump_cfg.pop(k, None)
- all_keys = list(dump_cfg.keys())
+ all_keys = list(dump_cfg)
for k in all_keys:
if k.startswith("_"):
dump_cfg.pop(k, None)
@@ -103,6 +104,6 @@ def handle(name, cfg, cloud, log, args):
line = "ci-info: %s\n" % (line)
content_to_file.append(line)
if out_file:
- util.write_file(out_file, "".join(content_to_file), 0644, "w")
+ util.write_file(out_file, "".join(content_to_file), 0o644, "w")
else:
util.multi_log("".join(content_to_file), console=True, stderr=False)
diff --git a/cloudinit/config/cc_landscape.py b/cloudinit/config/cc_landscape.py
index 8a709677..0b9d846e 100644
--- a/cloudinit/config/cc_landscape.py
+++ b/cloudinit/config/cc_landscape.py
@@ -20,7 +20,7 @@
import os
-from StringIO import StringIO
+from six import StringIO
from configobj import ConfigObj
diff --git a/cloudinit/config/cc_mcollective.py b/cloudinit/config/cc_mcollective.py
index b670390d..425420ae 100644
--- a/cloudinit/config/cc_mcollective.py
+++ b/cloudinit/config/cc_mcollective.py
@@ -19,7 +19,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+import six
+from six import StringIO
# Used since this can maintain comments
# and doesn't need a top level section
@@ -51,17 +52,17 @@ def handle(name, cfg, cloud, log, _args):
# original file in order to be able to mix the rest up
mcollective_config = ConfigObj(SERVER_CFG)
# See: http://tiny.cc/jh9agw
- for (cfg_name, cfg) in mcollective_cfg['conf'].iteritems():
+ for (cfg_name, cfg) in mcollective_cfg['conf'].items():
if cfg_name == 'public-cert':
- util.write_file(PUBCERT_FILE, cfg, mode=0644)
+ util.write_file(PUBCERT_FILE, cfg, mode=0o644)
mcollective_config['plugin.ssl_server_public'] = PUBCERT_FILE
mcollective_config['securityprovider'] = 'ssl'
elif cfg_name == 'private-cert':
- util.write_file(PRICERT_FILE, cfg, mode=0600)
+ util.write_file(PRICERT_FILE, cfg, mode=0o600)
mcollective_config['plugin.ssl_server_private'] = PRICERT_FILE
mcollective_config['securityprovider'] = 'ssl'
else:
- if isinstance(cfg, (basestring, str)):
+ if isinstance(cfg, six.string_types):
# Just set it in the 'main' section
mcollective_config[cfg_name] = cfg
elif isinstance(cfg, (dict)):
@@ -69,7 +70,7 @@ def handle(name, cfg, cloud, log, _args):
# if it is needed and then add/or create items as needed
if cfg_name not in mcollective_config.sections:
mcollective_config[cfg_name] = {}
- for (o, v) in cfg.iteritems():
+ for (o, v) in cfg.items():
mcollective_config[cfg_name][o] = v
else:
# Otherwise just try to convert it to a string
@@ -81,7 +82,7 @@ def handle(name, cfg, cloud, log, _args):
contents = StringIO()
mcollective_config.write(contents)
contents = contents.getvalue()
- util.write_file(SERVER_CFG, contents, mode=0644)
+ util.write_file(SERVER_CFG, contents, mode=0o644)
# Start mcollective
util.subp(['service', 'mcollective', 'start'], capture=False)
diff --git a/cloudinit/config/cc_phone_home.py b/cloudinit/config/cc_phone_home.py
index 5bc68b83..18a7ddad 100644
--- a/cloudinit/config/cc_phone_home.py
+++ b/cloudinit/config/cc_phone_home.py
@@ -81,7 +81,7 @@ def handle(name, cfg, cloud, log, args):
'pub_key_ecdsa': '/etc/ssh/ssh_host_ecdsa_key.pub',
}
- for (n, path) in pubkeys.iteritems():
+ for (n, path) in pubkeys.items():
try:
all_keys[n] = util.load_file(path)
except:
@@ -99,7 +99,7 @@ def handle(name, cfg, cloud, log, args):
# Get them read to be posted
real_submit_keys = {}
- for (k, v) in submit_keys.iteritems():
+ for (k, v) in submit_keys.items():
if v is None:
real_submit_keys[k] = 'N/A'
else:
diff --git a/cloudinit/config/cc_puppet.py b/cloudinit/config/cc_puppet.py
index 471a1a8a..4501598e 100644
--- a/cloudinit/config/cc_puppet.py
+++ b/cloudinit/config/cc_puppet.py
@@ -18,7 +18,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+from six import StringIO
import os
import socket
@@ -81,22 +81,22 @@ def handle(name, cfg, cloud, log, _args):
cleaned_contents = '\n'.join(cleaned_lines)
puppet_config.readfp(StringIO(cleaned_contents),
filename=PUPPET_CONF_PATH)
- for (cfg_name, cfg) in puppet_cfg['conf'].iteritems():
+ for (cfg_name, cfg) in puppet_cfg['conf'].items():
# Cert configuration is a special case
# Dump the puppet master ca certificate in the correct place
if cfg_name == 'ca_cert':
# Puppet ssl sub-directory isn't created yet
# Create it with the proper permissions and ownership
- util.ensure_dir(PUPPET_SSL_DIR, 0771)
+ util.ensure_dir(PUPPET_SSL_DIR, 0o771)
util.chownbyname(PUPPET_SSL_DIR, 'puppet', 'root')
util.ensure_dir(PUPPET_SSL_CERT_DIR)
util.chownbyname(PUPPET_SSL_CERT_DIR, 'puppet', 'root')
- util.write_file(PUPPET_SSL_CERT_PATH, str(cfg))
+ util.write_file(PUPPET_SSL_CERT_PATH, cfg)
util.chownbyname(PUPPET_SSL_CERT_PATH, 'puppet', 'root')
else:
# Iterate throug the config items, we'll use ConfigParser.set
# to overwrite or create new items as needed
- for (o, v) in cfg.iteritems():
+ for (o, v) in cfg.items():
if o == 'certname':
# Expand %f as the fqdn
# TODO(harlowja) should this use the cloud fqdn??
diff --git a/cloudinit/config/cc_resolv_conf.py b/cloudinit/config/cc_resolv_conf.py
index bbaa6c63..71d9e3a7 100644
--- a/cloudinit/config/cc_resolv_conf.py
+++ b/cloudinit/config/cc_resolv_conf.py
@@ -66,8 +66,8 @@ def generate_resolv_conf(template_fn, params, target_fname="/etc/resolv.conf"):
false_flags = []
if 'options' in params:
- for key, val in params['options'].iteritems():
- if type(val) == bool:
+ for key, val in params['options'].items():
+ if isinstance(val, bool):
if val:
flags.append(key)
else:
diff --git a/cloudinit/config/cc_rightscale_userdata.py b/cloudinit/config/cc_rightscale_userdata.py
index 7d2ec10a..24880d13 100644
--- a/cloudinit/config/cc_rightscale_userdata.py
+++ b/cloudinit/config/cc_rightscale_userdata.py
@@ -41,7 +41,7 @@ from cloudinit.settings import PER_INSTANCE
from cloudinit import url_helper as uhelp
from cloudinit import util
-from urlparse import parse_qs
+from six.moves.urllib_parse import parse_qs
frequency = PER_INSTANCE
@@ -82,7 +82,7 @@ def handle(name, _cfg, cloud, log, _args):
resp = uhelp.readurl(url)
# Ensure its a valid http response (and something gotten)
if resp.ok() and resp.contents:
- util.write_file(fname, str(resp), mode=0700)
+ util.write_file(fname, resp, mode=0o700)
wrote_fns.append(fname)
except Exception as e:
captured_excps.append(e)
diff --git a/cloudinit/config/cc_runcmd.py b/cloudinit/config/cc_runcmd.py
index 598c3a3e..66dc3363 100644
--- a/cloudinit/config/cc_runcmd.py
+++ b/cloudinit/config/cc_runcmd.py
@@ -33,6 +33,6 @@ def handle(name, cfg, cloud, log, _args):
cmd = cfg["runcmd"]
try:
content = util.shellify(cmd)
- util.write_file(out_fn, content, 0700)
+ util.write_file(out_fn, content, 0o700)
except:
util.logexc(log, "Failed to shellify %s into file %s", cmd, out_fn)
diff --git a/cloudinit/config/cc_salt_minion.py b/cloudinit/config/cc_salt_minion.py
index 53013dcb..f5786a31 100644
--- a/cloudinit/config/cc_salt_minion.py
+++ b/cloudinit/config/cc_salt_minion.py
@@ -47,7 +47,7 @@ def handle(name, cfg, cloud, log, _args):
# ... copy the key pair if specified
if 'public_key' in salt_cfg and 'private_key' in salt_cfg:
pki_dir = salt_cfg.get('pki_dir', '/etc/salt/pki')
- with util.umask(077):
+ with util.umask(0o77):
util.ensure_dir(pki_dir)
pub_name = os.path.join(pki_dir, 'minion.pub')
pem_name = os.path.join(pki_dir, 'minion.pem')
diff --git a/cloudinit/config/cc_seed_random.py b/cloudinit/config/cc_seed_random.py
index 49a6b3e8..3288a853 100644
--- a/cloudinit/config/cc_seed_random.py
+++ b/cloudinit/config/cc_seed_random.py
@@ -21,7 +21,8 @@
import base64
import os
-from StringIO import StringIO
+
+from six import BytesIO
from cloudinit.settings import PER_INSTANCE
from cloudinit import log as logging
@@ -33,13 +34,13 @@ LOG = logging.getLogger(__name__)
def _decode(data, encoding=None):
if not data:
- return ''
+ return b''
if not encoding or encoding.lower() in ['raw']:
- return data
+ return util.encode_text(data)
elif encoding.lower() in ['base64', 'b64']:
return base64.b64decode(data)
elif encoding.lower() in ['gzip', 'gz']:
- return util.decomp_gzip(data, quiet=False)
+ return util.decomp_gzip(data, quiet=False, decode=None)
else:
raise IOError("Unknown random_seed encoding: %s" % (encoding))
@@ -64,9 +65,9 @@ def handle_random_seed_command(command, required, env=None):
def handle(name, cfg, cloud, log, _args):
mycfg = cfg.get('random_seed', {})
seed_path = mycfg.get('file', '/dev/urandom')
- seed_data = mycfg.get('data', '')
+ seed_data = mycfg.get('data', b'')
- seed_buf = StringIO()
+ seed_buf = BytesIO()
if seed_data:
seed_buf.write(_decode(seed_data, encoding=mycfg.get('encoding')))
@@ -74,7 +75,7 @@ def handle(name, cfg, cloud, log, _args):
# openstack meta_data.json
metadata = cloud.datasource.metadata
if metadata and 'random_seed' in metadata:
- seed_buf.write(metadata['random_seed'])
+ seed_buf.write(util.encode_text(metadata['random_seed']))
seed_data = seed_buf.getvalue()
if len(seed_data):
diff --git a/cloudinit/config/cc_set_passwords.py b/cloudinit/config/cc_set_passwords.py
index 4ca85e21..0c315361 100644
--- a/cloudinit/config/cc_set_passwords.py
+++ b/cloudinit/config/cc_set_passwords.py
@@ -28,11 +28,11 @@ from cloudinit import distros as ds
from cloudinit import ssh_util
from cloudinit import util
-from string import letters, digits
+from string import ascii_letters, digits
# We are removing certain 'painful' letters/numbers
-PW_SET = (letters.translate(None, 'loLOI') +
- digits.translate(None, '01'))
+PW_SET = (''.join([x for x in ascii_letters + digits
+ if x not in 'loLOI01']))
def handle(_name, cfg, cloud, log, args):
diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py
index 4c76581c..ab6940fa 100644
--- a/cloudinit/config/cc_ssh.py
+++ b/cloudinit/config/cc_ssh.py
@@ -34,12 +34,12 @@ DISABLE_ROOT_OPTS = ("no-port-forwarding,no-agent-forwarding,"
"rather than the user \\\"root\\\".\';echo;sleep 10\"")
KEY_2_FILE = {
- "rsa_private": ("/etc/ssh/ssh_host_rsa_key", 0600),
- "rsa_public": ("/etc/ssh/ssh_host_rsa_key.pub", 0644),
- "dsa_private": ("/etc/ssh/ssh_host_dsa_key", 0600),
- "dsa_public": ("/etc/ssh/ssh_host_dsa_key.pub", 0644),
- "ecdsa_private": ("/etc/ssh/ssh_host_ecdsa_key", 0600),
- "ecdsa_public": ("/etc/ssh/ssh_host_ecdsa_key.pub", 0644),
+ "rsa_private": ("/etc/ssh/ssh_host_rsa_key", 0o600),
+ "rsa_public": ("/etc/ssh/ssh_host_rsa_key.pub", 0o644),
+ "dsa_private": ("/etc/ssh/ssh_host_dsa_key", 0o600),
+ "dsa_public": ("/etc/ssh/ssh_host_dsa_key.pub", 0o644),
+ "ecdsa_private": ("/etc/ssh/ssh_host_ecdsa_key", 0o600),
+ "ecdsa_public": ("/etc/ssh/ssh_host_ecdsa_key.pub", 0o644),
}
PRIV_2_PUB = {
@@ -68,13 +68,13 @@ def handle(_name, cfg, cloud, log, _args):
if "ssh_keys" in cfg:
# if there are keys in cloud-config, use them
- for (key, val) in cfg["ssh_keys"].iteritems():
+ for (key, val) in cfg["ssh_keys"].items():
if key in KEY_2_FILE:
tgt_fn = KEY_2_FILE[key][0]
tgt_perms = KEY_2_FILE[key][1]
util.write_file(tgt_fn, val, tgt_perms)
- for (priv, pub) in PRIV_2_PUB.iteritems():
+ for (priv, pub) in PRIV_2_PUB.items():
if pub in cfg['ssh_keys'] or priv not in cfg['ssh_keys']:
continue
pair = (KEY_2_FILE[priv][0], KEY_2_FILE[pub][0])
diff --git a/cloudinit/config/cc_ssh_authkey_fingerprints.py b/cloudinit/config/cc_ssh_authkey_fingerprints.py
index 51580633..6ce831bc 100644
--- a/cloudinit/config/cc_ssh_authkey_fingerprints.py
+++ b/cloudinit/config/cc_ssh_authkey_fingerprints.py
@@ -32,7 +32,7 @@ from cloudinit import util
def _split_hash(bin_hash):
split_up = []
- for i in xrange(0, len(bin_hash), 2):
+ for i in range(0, len(bin_hash), 2):
split_up.append(bin_hash[i:i + 2])
return split_up
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py
index a73d6f4e..4b03ea91 100644
--- a/cloudinit/config/cc_write_files.py
+++ b/cloudinit/config/cc_write_files.py
@@ -18,6 +18,7 @@
import base64
import os
+import six
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
@@ -25,7 +26,7 @@ from cloudinit import util
frequency = PER_INSTANCE
DEFAULT_OWNER = "root:root"
-DEFAULT_PERMS = 0644
+DEFAULT_PERMS = 0o644
UNKNOWN_ENC = 'text/plain'
@@ -79,7 +80,7 @@ def write_files(name, files, log):
def decode_perms(perm, default, log):
try:
- if isinstance(perm, (int, long, float)):
+ if isinstance(perm, six.integer_types + (float,)):
# Just 'downcast' it (if a float)
return int(perm)
else:
diff --git a/cloudinit/config/cc_yum_add_repo.py b/cloudinit/config/cc_yum_add_repo.py
index 0d836f28..3b821af9 100644
--- a/cloudinit/config/cc_yum_add_repo.py
+++ b/cloudinit/config/cc_yum_add_repo.py
@@ -18,9 +18,10 @@
import os
-from cloudinit import util
-
import configobj
+import six
+
+from cloudinit import util
def _canonicalize_id(repo_id):
@@ -37,7 +38,7 @@ def _format_repo_value(val):
# Can handle 'lists' in certain cases
# See: http://bit.ly/Qqrf1t
return "\n ".join([_format_repo_value(v) for v in val])
- if not isinstance(val, (basestring, str)):
+ if not isinstance(val, six.string_types):
return str(val)
return val
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index 5eab780b..ab874b45 100644
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -21,10 +21,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+import six
+from six import StringIO
import abc
-import itertools
import os
import re
@@ -36,6 +36,7 @@ from cloudinit import util
from cloudinit.distros.parsers import hosts
+
OSFAMILIES = {
'debian': ['debian', 'ubuntu'],
'redhat': ['fedora', 'rhel'],
@@ -272,7 +273,7 @@ class Distro(object):
if header:
contents.write("%s\n" % (header))
contents.write("%s\n" % (eh))
- util.write_file(self.hosts_fn, contents.getvalue(), mode=0644)
+ util.write_file(self.hosts_fn, contents.getvalue(), mode=0o644)
def _bring_up_interface(self, device_name):
cmd = ['ifup', device_name]
@@ -334,7 +335,7 @@ class Distro(object):
redact_opts = ['passwd']
# Check the values and create the command
- for key, val in kwargs.iteritems():
+ for key, val in kwargs.items():
if key in adduser_opts and val and isinstance(val, str):
adduser_cmd.extend([adduser_opts[key], val])
@@ -393,7 +394,7 @@ class Distro(object):
if 'ssh_authorized_keys' in kwargs:
# Try to handle this in a smart manner.
keys = kwargs['ssh_authorized_keys']
- if isinstance(keys, (basestring, str)):
+ if isinstance(keys, six.string_types):
keys = [keys]
if isinstance(keys, dict):
keys = list(keys.values())
@@ -468,7 +469,7 @@ class Distro(object):
util.make_header(base="added"),
"#includedir %s" % (path), '']
sudoers_contents = "\n".join(lines)
- util.write_file(sudo_base, sudoers_contents, 0440)
+ util.write_file(sudo_base, sudoers_contents, 0o440)
else:
lines = ['', util.make_header(base="added"),
"#includedir %s" % (path), '']
@@ -478,7 +479,7 @@ class Distro(object):
except IOError as e:
util.logexc(LOG, "Failed to write %s", sudo_base)
raise e
- util.ensure_dir(path, 0750)
+ util.ensure_dir(path, 0o750)
def write_sudo_rules(self, user, rules, sudo_file=None):
if not sudo_file:
@@ -491,7 +492,7 @@ class Distro(object):
if isinstance(rules, (list, tuple)):
for rule in rules:
lines.append("%s %s" % (user, rule))
- elif isinstance(rules, (basestring, str)):
+ elif isinstance(rules, six.string_types):
lines.append("%s %s" % (user, rules))
else:
msg = "Can not create sudoers rule addition with type %r"
@@ -506,7 +507,7 @@ class Distro(object):
content,
]
try:
- util.write_file(sudo_file, "\n".join(contents), 0440)
+ util.write_file(sudo_file, "\n".join(contents), 0o440)
except IOError as e:
util.logexc(LOG, "Failed to write sudoers file %s", sudo_file)
raise e
@@ -561,10 +562,10 @@ def _get_package_mirror_info(mirror_info, availability_zone=None,
subst['ec2_region'] = "%s" % availability_zone[0:-1]
results = {}
- for (name, mirror) in mirror_info.get('failsafe', {}).iteritems():
+ for (name, mirror) in mirror_info.get('failsafe', {}).items():
results[name] = mirror
- for (name, searchlist) in mirror_info.get('search', {}).iteritems():
+ for (name, searchlist) in mirror_info.get('search', {}).items():
mirrors = []
for tmpl in searchlist:
try:
@@ -604,30 +605,30 @@ def _get_arch_package_mirror_info(package_mirrors, arch):
# is the standard form used in the rest
# of cloud-init
def _normalize_groups(grp_cfg):
- if isinstance(grp_cfg, (str, basestring)):
+ if isinstance(grp_cfg, six.string_types):
grp_cfg = grp_cfg.strip().split(",")
- if isinstance(grp_cfg, (list)):
+ if isinstance(grp_cfg, list):
c_grp_cfg = {}
for i in grp_cfg:
- if isinstance(i, (dict)):
+ if isinstance(i, dict):
for k, v in i.items():
if k not in c_grp_cfg:
- if isinstance(v, (list)):
+ if isinstance(v, list):
c_grp_cfg[k] = list(v)
- elif isinstance(v, (basestring, str)):
+ elif isinstance(v, six.string_types):
c_grp_cfg[k] = [v]
else:
raise TypeError("Bad group member type %s" %
type_utils.obj_name(v))
else:
- if isinstance(v, (list)):
+ if isinstance(v, list):
c_grp_cfg[k].extend(v)
- elif isinstance(v, (basestring, str)):
+ elif isinstance(v, six.string_types):
c_grp_cfg[k].append(v)
else:
raise TypeError("Bad group member type %s" %
type_utils.obj_name(v))
- elif isinstance(i, (str, basestring)):
+ elif isinstance(i, six.string_types):
if i not in c_grp_cfg:
c_grp_cfg[i] = []
else:
@@ -635,7 +636,7 @@ def _normalize_groups(grp_cfg):
type_utils.obj_name(i))
grp_cfg = c_grp_cfg
groups = {}
- if isinstance(grp_cfg, (dict)):
+ if isinstance(grp_cfg, dict):
for (grp_name, grp_members) in grp_cfg.items():
groups[grp_name] = util.uniq_merge_sorted(grp_members)
else:
@@ -661,29 +662,29 @@ def _normalize_groups(grp_cfg):
# entry 'default' which will be marked as true
# all other users will be marked as false.
def _normalize_users(u_cfg, def_user_cfg=None):
- if isinstance(u_cfg, (dict)):
+ if isinstance(u_cfg, dict):
ad_ucfg = []
for (k, v) in u_cfg.items():
- if isinstance(v, (bool, int, basestring, str, float)):
+ if isinstance(v, (bool, int, float) + six.string_types):
if util.is_true(v):
ad_ucfg.append(str(k))
- elif isinstance(v, (dict)):
+ elif isinstance(v, dict):
v['name'] = k
ad_ucfg.append(v)
else:
raise TypeError(("Unmappable user value type %s"
" for key %s") % (type_utils.obj_name(v), k))
u_cfg = ad_ucfg
- elif isinstance(u_cfg, (str, basestring)):
+ elif isinstance(u_cfg, six.string_types):
u_cfg = util.uniq_merge_sorted(u_cfg)
users = {}
for user_config in u_cfg:
- if isinstance(user_config, (str, basestring, list)):
+ if isinstance(user_config, (list,) + six.string_types):
for u in util.uniq_merge(user_config):
if u and u not in users:
users[u] = {}
- elif isinstance(user_config, (dict)):
+ elif isinstance(user_config, dict):
if 'name' in user_config:
n = user_config.pop('name')
prev_config = users.get(n) or {}
@@ -784,11 +785,11 @@ def normalize_users_groups(cfg, distro):
old_user = cfg['user']
# Translate it into the format that is more useful
# going forward
- if isinstance(old_user, (basestring, str)):
+ if isinstance(old_user, six.string_types):
old_user = {
'name': old_user,
}
- if not isinstance(old_user, (dict)):
+ if not isinstance(old_user, dict):
LOG.warn(("Format for 'user' key must be a string or "
"dictionary and not %s"), type_utils.obj_name(old_user))
old_user = {}
@@ -813,7 +814,7 @@ def normalize_users_groups(cfg, distro):
default_user_config = util.mergemanydict([old_user, distro_user_config])
base_users = cfg.get('users', [])
- if not isinstance(base_users, (list, dict, str, basestring)):
+ if not isinstance(base_users, (list, dict) + six.string_types):
LOG.warn(("Format for 'users' key must be a comma separated string"
" or a dictionary or a list and not %s"),
type_utils.obj_name(base_users))
@@ -822,12 +823,12 @@ def normalize_users_groups(cfg, distro):
if old_user:
# Ensure that when user: is provided that this user
# always gets added (as the default user)
- if isinstance(base_users, (list)):
+ if isinstance(base_users, list):
# Just add it on at the end...
base_users.append({'name': 'default'})
- elif isinstance(base_users, (dict)):
+ elif isinstance(base_users, dict):
base_users['default'] = dict(base_users).get('default', True)
- elif isinstance(base_users, (str, basestring)):
+ elif isinstance(base_users, six.string_types):
# Just append it on to be re-parsed later
base_users += ",default"
@@ -852,11 +853,11 @@ def extract_default(users, default_name=None, default_config=None):
return config['default']
tmp_users = users.items()
- tmp_users = dict(itertools.ifilter(safe_find, tmp_users))
+ tmp_users = dict(filter(safe_find, tmp_users))
if not tmp_users:
return (default_name, default_config)
else:
- name = tmp_users.keys()[0]
+ name = list(tmp_users)[0]
config = tmp_users[name]
config.pop('default', None)
return (name, config)
diff --git a/cloudinit/distros/arch.py b/cloudinit/distros/arch.py
index 68bf1aab..45fcf26f 100644
--- a/cloudinit/distros/arch.py
+++ b/cloudinit/distros/arch.py
@@ -66,7 +66,7 @@ class Distro(distros.Distro):
settings, entries)
dev_names = entries.keys()
# Format for netctl
- for (dev, info) in entries.iteritems():
+ for (dev, info) in entries.items():
nameservers = []
net_fn = self.network_conf_dir + dev
net_cfg = {
@@ -129,7 +129,7 @@ class Distro(distros.Distro):
if not conf:
conf = HostnameConf('')
conf.set_hostname(your_hostname)
- util.write_file(out_fn, str(conf), 0644)
+ util.write_file(out_fn, conf, 0o644)
def _read_system_hostname(self):
sys_hostname = self._read_hostname(self.hostname_conf_fn)
diff --git a/cloudinit/distros/debian.py b/cloudinit/distros/debian.py
index b09eb094..6d3a82bf 100644
--- a/cloudinit/distros/debian.py
+++ b/cloudinit/distros/debian.py
@@ -97,7 +97,7 @@ class Distro(distros.Distro):
if not conf:
conf = HostnameConf('')
conf.set_hostname(your_hostname)
- util.write_file(out_fn, str(conf), 0644)
+ util.write_file(out_fn, str(conf), 0o644)
def _read_system_hostname(self):
sys_hostname = self._read_hostname(self.hostname_conf_fn)
diff --git a/cloudinit/distros/freebsd.py b/cloudinit/distros/freebsd.py
index f1b4a256..4c484639 100644
--- a/cloudinit/distros/freebsd.py
+++ b/cloudinit/distros/freebsd.py
@@ -16,7 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+import six
+from six import StringIO
import re
@@ -203,8 +204,9 @@ class Distro(distros.Distro):
redact_opts = ['passwd']
- for key, val in kwargs.iteritems():
- if key in adduser_opts and val and isinstance(val, basestring):
+ for key, val in kwargs.items():
+ if (key in adduser_opts and val
+ and isinstance(val, six.string_types)):
adduser_cmd.extend([adduser_opts[key], val])
# Redact certain fields from the logs
@@ -271,7 +273,7 @@ class Distro(distros.Distro):
nameservers = []
searchdomains = []
dev_names = entries.keys()
- for (device, info) in entries.iteritems():
+ for (device, info) in entries.items():
# Skip the loopback interface.
if device.startswith('lo'):
continue
@@ -323,7 +325,7 @@ class Distro(distros.Distro):
resolvconf.add_search_domain(domain)
except ValueError:
util.logexc(LOG, "Failed to add search domain %s", domain)
- util.write_file(self.resolv_conf_fn, str(resolvconf), 0644)
+ util.write_file(self.resolv_conf_fn, str(resolvconf), 0o644)
return dev_names
diff --git a/cloudinit/distros/gentoo.py b/cloudinit/distros/gentoo.py
index 09dd0d73..9e80583c 100644
--- a/cloudinit/distros/gentoo.py
+++ b/cloudinit/distros/gentoo.py
@@ -108,7 +108,7 @@ class Distro(distros.Distro):
if not conf:
conf = HostnameConf('')
conf.set_hostname(your_hostname)
- util.write_file(out_fn, str(conf), 0644)
+ util.write_file(out_fn, conf, 0o644)
def _read_system_hostname(self):
sys_hostname = self._read_hostname(self.hostname_conf_fn)
diff --git a/cloudinit/distros/net_util.py b/cloudinit/distros/net_util.py
index 8b28e2d1..cadfa6b6 100644
--- a/cloudinit/distros/net_util.py
+++ b/cloudinit/distros/net_util.py
@@ -103,7 +103,7 @@ def translate_network(settings):
consume[cmd] = args
# Check if anything left over to consume
absorb = False
- for (cmd, args) in consume.iteritems():
+ for (cmd, args) in consume.items():
if cmd == 'iface':
absorb = True
if absorb:
diff --git a/cloudinit/distros/parsers/hostname.py b/cloudinit/distros/parsers/hostname.py
index 617b3c36..84a1de42 100644
--- a/cloudinit/distros/parsers/hostname.py
+++ b/cloudinit/distros/parsers/hostname.py
@@ -16,7 +16,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+from six import StringIO
from cloudinit.distros.parsers import chop_comment
diff --git a/cloudinit/distros/parsers/hosts.py b/cloudinit/distros/parsers/hosts.py
index 94c97051..3c5498ee 100644
--- a/cloudinit/distros/parsers/hosts.py
+++ b/cloudinit/distros/parsers/hosts.py
@@ -16,7 +16,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+from six import StringIO
from cloudinit.distros.parsers import chop_comment
diff --git a/cloudinit/distros/parsers/resolv_conf.py b/cloudinit/distros/parsers/resolv_conf.py
index 5733c25a..8aee03a4 100644
--- a/cloudinit/distros/parsers/resolv_conf.py
+++ b/cloudinit/distros/parsers/resolv_conf.py
@@ -16,7 +16,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+from six import StringIO
from cloudinit import util
diff --git a/cloudinit/distros/parsers/sys_conf.py b/cloudinit/distros/parsers/sys_conf.py
index 20ca1871..d795e12f 100644
--- a/cloudinit/distros/parsers/sys_conf.py
+++ b/cloudinit/distros/parsers/sys_conf.py
@@ -16,7 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
+import six
+from six import StringIO
import pipes
import re
@@ -69,7 +70,7 @@ class SysConf(configobj.ConfigObj):
return out_contents.getvalue()
def _quote(self, value, multiline=False):
- if not isinstance(value, (str, basestring)):
+ if not isinstance(value, six.string_types):
raise ValueError('Value "%s" is not a string' % (value))
if len(value) == 0:
return ''
diff --git a/cloudinit/distros/rhel.py b/cloudinit/distros/rhel.py
index d9588632..7408989c 100644
--- a/cloudinit/distros/rhel.py
+++ b/cloudinit/distros/rhel.py
@@ -73,7 +73,7 @@ class Distro(distros.Distro):
searchservers = []
dev_names = entries.keys()
use_ipv6 = False
- for (dev, info) in entries.iteritems():
+ for (dev, info) in entries.items():
net_fn = self.network_script_tpl % (dev)
net_cfg = {
'DEVICE': dev,
diff --git a/cloudinit/distros/rhel_util.py b/cloudinit/distros/rhel_util.py
index 063d536e..84aad623 100644
--- a/cloudinit/distros/rhel_util.py
+++ b/cloudinit/distros/rhel_util.py
@@ -50,7 +50,7 @@ def update_sysconfig_file(fn, adjustments, allow_empty=False):
]
if not exists:
lines.insert(0, util.make_header())
- util.write_file(fn, "\n".join(lines) + "\n", 0644)
+ util.write_file(fn, "\n".join(lines) + "\n", 0o644)
# Helper function to read a RHEL/SUSE /etc/sysconfig/* file
@@ -86,4 +86,4 @@ def update_resolve_conf_file(fn, dns_servers, search_servers):
r_conf.add_search_domain(s)
except ValueError:
util.logexc(LOG, "Failed at adding search domain %s", s)
- util.write_file(fn, str(r_conf), 0644)
+ util.write_file(fn, r_conf, 0o644)
diff --git a/cloudinit/distros/sles.py b/cloudinit/distros/sles.py
index 43682a12..620c974c 100644
--- a/cloudinit/distros/sles.py
+++ b/cloudinit/distros/sles.py
@@ -62,7 +62,7 @@ class Distro(distros.Distro):
nameservers = []
searchservers = []
dev_names = entries.keys()
- for (dev, info) in entries.iteritems():
+ for (dev, info) in entries.items():
net_fn = self.network_script_tpl % (dev)
mode = info.get('auto')
if mode and mode.lower() == 'true':
@@ -113,7 +113,7 @@ class Distro(distros.Distro):
if not conf:
conf = HostnameConf('')
conf.set_hostname(hostname)
- util.write_file(out_fn, str(conf), 0644)
+ util.write_file(out_fn, str(conf), 0o644)
def _read_system_hostname(self):
host_fn = self.hostname_conf_fn
diff --git a/cloudinit/ec2_utils.py b/cloudinit/ec2_utils.py
index e69d06ff..e1ed4091 100644
--- a/cloudinit/ec2_utils.py
+++ b/cloudinit/ec2_utils.py
@@ -17,7 +17,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import functools
-import httplib
import json
from cloudinit import log as logging
@@ -25,7 +24,7 @@ from cloudinit import url_helper
from cloudinit import util
LOG = logging.getLogger(__name__)
-SKIP_USERDATA_CODES = frozenset([httplib.NOT_FOUND])
+SKIP_USERDATA_CODES = frozenset([url_helper.NOT_FOUND])
class MetadataLeafDecoder(object):
@@ -123,7 +122,7 @@ class MetadataMaterializer(object):
leaf_contents = {}
for (field, resource) in leaves.items():
leaf_url = url_helper.combine_url(base_url, resource)
- leaf_blob = str(self._caller(leaf_url))
+ leaf_blob = self._caller(leaf_url).contents
leaf_contents[field] = self._leaf_decoder(field, leaf_blob)
joined = {}
joined.update(child_contents)
@@ -160,7 +159,7 @@ def get_instance_userdata(api_version='latest',
timeout=timeout,
retries=retries,
exception_cb=exception_cb)
- user_data = str(response)
+ user_data = response.contents
except url_helper.UrlError as e:
if e.code not in SKIP_USERDATA_CODES:
util.logexc(LOG, "Failed fetching userdata from url %s", ud_url)
@@ -183,7 +182,7 @@ def get_instance_metadata(api_version='latest',
try:
response = caller(md_url)
- materializer = MetadataMaterializer(str(response),
+ materializer = MetadataMaterializer(response.contents,
md_url, caller,
leaf_decoder=leaf_decoder)
md = materializer.materialize()
diff --git a/cloudinit/handlers/__init__.py b/cloudinit/handlers/__init__.py
index 059d7495..6b7abbcd 100644
--- a/cloudinit/handlers/__init__.py
+++ b/cloudinit/handlers/__init__.py
@@ -22,6 +22,7 @@
import abc
import os
+import six
from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE, FREQUENCIES)
@@ -147,7 +148,7 @@ def walker_handle_handler(pdata, _ctype, _filename, payload):
if not modfname.endswith(".py"):
modfname = "%s.py" % (modfname)
# TODO(harlowja): Check if path exists??
- util.write_file(modfname, payload, 0600)
+ util.write_file(modfname, payload, 0o600)
handlers = pdata['handlers']
try:
mod = fixup_handler(importer.import_module(modname))
@@ -174,11 +175,11 @@ def _extract_first_or_bytes(blob, size):
def _escape_string(text):
try:
- return text.encode("string-escape")
- except TypeError:
+ return text.encode("string_escape")
+ except (LookupError, TypeError):
try:
- # Unicode doesn't support string-escape...
- return text.encode('unicode-escape')
+ # Unicode (and Python 3's str) doesn't support string_escape...
+ return text.encode('unicode_escape')
except TypeError:
# Give up...
pass
@@ -232,7 +233,8 @@ def walk(msg, callback, data):
headers = dict(part)
LOG.debug(headers)
headers['Content-Type'] = ctype
- callback(data, filename, part.get_payload(decode=True), headers)
+ payload = util.fully_decoded_payload(part)
+ callback(data, filename, payload, headers)
partnum = partnum + 1
diff --git a/cloudinit/handlers/boot_hook.py b/cloudinit/handlers/boot_hook.py
index 3a50cf87..a4ea47ac 100644
--- a/cloudinit/handlers/boot_hook.py
+++ b/cloudinit/handlers/boot_hook.py
@@ -50,7 +50,7 @@ class BootHookPartHandler(handlers.Handler):
filepath = os.path.join(self.boothook_dir, filename)
contents = util.strip_prefix_suffix(util.dos2unix(payload),
prefix=BOOTHOOK_PREFIX)
- util.write_file(filepath, contents.lstrip(), 0700)
+ util.write_file(filepath, contents.lstrip(), 0o700)
return filepath
def handle_part(self, data, ctype, filename, payload, frequency):
diff --git a/cloudinit/handlers/cloud_config.py b/cloudinit/handlers/cloud_config.py
index bf994e33..07b6d0e0 100644
--- a/cloudinit/handlers/cloud_config.py
+++ b/cloudinit/handlers/cloud_config.py
@@ -95,7 +95,7 @@ class CloudConfigPartHandler(handlers.Handler):
lines.append(util.yaml_dumps(self.cloud_buf))
else:
lines = []
- util.write_file(self.cloud_fn, "\n".join(lines), 0600)
+ util.write_file(self.cloud_fn, "\n".join(lines), 0o600)
def _extract_mergers(self, payload, headers):
merge_header_headers = ''
diff --git a/cloudinit/handlers/shell_script.py b/cloudinit/handlers/shell_script.py
index 9755ab05..b5087693 100644
--- a/cloudinit/handlers/shell_script.py
+++ b/cloudinit/handlers/shell_script.py
@@ -52,4 +52,4 @@ class ShellScriptPartHandler(handlers.Handler):
filename = util.clean_filename(filename)
payload = util.dos2unix(payload)
path = os.path.join(self.script_dir, filename)
- util.write_file(path, payload, 0700)
+ util.write_file(path, payload, 0o700)
diff --git a/cloudinit/handlers/upstart_job.py b/cloudinit/handlers/upstart_job.py
index 50d193c4..c5bea711 100644
--- a/cloudinit/handlers/upstart_job.py
+++ b/cloudinit/handlers/upstart_job.py
@@ -65,7 +65,7 @@ class UpstartJobPartHandler(handlers.Handler):
payload = util.dos2unix(payload)
path = os.path.join(self.upstart_dir, filename)
- util.write_file(path, payload, 0644)
+ util.write_file(path, payload, 0o644)
if SUITABLE_UPSTART:
util.subp(["initctl", "reload-configuration"], capture=False)
diff --git a/cloudinit/helpers.py b/cloudinit/helpers.py
index e701126e..5e99d185 100644
--- a/cloudinit/helpers.py
+++ b/cloudinit/helpers.py
@@ -23,10 +23,11 @@
from time import time
import contextlib
-import io
import os
-from ConfigParser import (NoSectionError, NoOptionError, RawConfigParser)
+import six
+from six.moves.configparser import (
+ NoSectionError, NoOptionError, RawConfigParser)
from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS, PER_ONCE,
CFG_ENV_NAME)
@@ -318,10 +319,7 @@ class ContentHandlers(object):
return self.registered[content_type]
def items(self):
- return self.registered.items()
-
- def iteritems(self):
- return self.registered.iteritems()
+ return list(self.registered.items())
class Paths(object):
@@ -449,7 +447,7 @@ class DefaultingConfigParser(RawConfigParser):
def stringify(self, header=None):
contents = ''
- with io.BytesIO() as outputstream:
+ with six.StringIO() as outputstream:
self.write(outputstream)
outputstream.flush()
contents = outputstream.getvalue()
diff --git a/cloudinit/log.py b/cloudinit/log.py
index 622c946c..3c79b9c9 100644
--- a/cloudinit/log.py
+++ b/cloudinit/log.py
@@ -28,7 +28,8 @@ import collections
import os
import sys
-from StringIO import StringIO
+import six
+from six import StringIO
# Logging levels for easy access
CRITICAL = logging.CRITICAL
@@ -72,13 +73,13 @@ def setupLogging(cfg=None):
log_cfgs = []
log_cfg = cfg.get('logcfg')
- if log_cfg and isinstance(log_cfg, (str, basestring)):
+ if log_cfg and isinstance(log_cfg, six.string_types):
# If there is a 'logcfg' entry in the config,
# respect it, it is the old keyname
log_cfgs.append(str(log_cfg))
elif "log_cfgs" in cfg:
for a_cfg in cfg['log_cfgs']:
- if isinstance(a_cfg, (basestring, str)):
+ if isinstance(a_cfg, six.string_types):
log_cfgs.append(a_cfg)
elif isinstance(a_cfg, (collections.Iterable)):
cfg_str = [str(c) for c in a_cfg]
diff --git a/cloudinit/mergers/__init__.py b/cloudinit/mergers/__init__.py
index 03aa1ee1..e13f55ac 100644
--- a/cloudinit/mergers/__init__.py
+++ b/cloudinit/mergers/__init__.py
@@ -18,6 +18,8 @@
import re
+import six
+
from cloudinit import importer
from cloudinit import log as logging
from cloudinit import type_utils
@@ -95,7 +97,7 @@ def dict_extract_mergers(config):
raw_mergers = config.pop('merge_type', None)
if raw_mergers is None:
return parsed_mergers
- if isinstance(raw_mergers, (str, basestring)):
+ if isinstance(raw_mergers, six.string_types):
return string_extract_mergers(raw_mergers)
for m in raw_mergers:
if isinstance(m, (dict)):
diff --git a/cloudinit/mergers/m_dict.py b/cloudinit/mergers/m_dict.py
index a16141fa..87cf1a72 100644
--- a/cloudinit/mergers/m_dict.py
+++ b/cloudinit/mergers/m_dict.py
@@ -16,6 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+
DEF_MERGE_TYPE = 'no_replace'
MERGE_TYPES = ('replace', DEF_MERGE_TYPE,)
@@ -57,7 +59,7 @@ class Merger(object):
return new_v
if isinstance(new_v, (list, tuple)) and self._recurse_array:
return self._merger.merge(old_v, new_v)
- if isinstance(new_v, (basestring)) and self._recurse_str:
+ if isinstance(new_v, six.string_types) and self._recurse_str:
return self._merger.merge(old_v, new_v)
if isinstance(new_v, (dict)) and self._recurse_dict:
return self._merger.merge(old_v, new_v)
diff --git a/cloudinit/mergers/m_list.py b/cloudinit/mergers/m_list.py
index 3b87b0fc..81e5c580 100644
--- a/cloudinit/mergers/m_list.py
+++ b/cloudinit/mergers/m_list.py
@@ -16,6 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+
DEF_MERGE_TYPE = 'replace'
MERGE_TYPES = ('append', 'prepend', DEF_MERGE_TYPE, 'no_replace')
@@ -73,7 +75,7 @@ class Merger(object):
return old_v
if isinstance(new_v, (list, tuple)) and self._recurse_array:
return self._merger.merge(old_v, new_v)
- if isinstance(new_v, (str, basestring)) and self._recurse_str:
+ if isinstance(new_v, six.string_types) and self._recurse_str:
return self._merger.merge(old_v, new_v)
if isinstance(new_v, (dict)) and self._recurse_dict:
return self._merger.merge(old_v, new_v)
@@ -82,6 +84,6 @@ class Merger(object):
# Ok now we are replacing same indexes
merged_list.extend(value)
common_len = min(len(merged_list), len(merge_with))
- for i in xrange(0, common_len):
+ for i in range(0, common_len):
merged_list[i] = merge_same_index(merged_list[i], merge_with[i])
return merged_list
diff --git a/cloudinit/mergers/m_str.py b/cloudinit/mergers/m_str.py
index e22ce28a..b00c4bf3 100644
--- a/cloudinit/mergers/m_str.py
+++ b/cloudinit/mergers/m_str.py
@@ -17,6 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+
class Merger(object):
def __init__(self, _merger, opts):
@@ -34,11 +36,11 @@ class Merger(object):
# perform the following action, if appending we will
# merge them together, otherwise we will just return value.
def _on_str(self, value, merge_with):
- if not isinstance(value, (basestring)):
+ if not isinstance(value, six.string_types):
return merge_with
if not self._append:
return merge_with
- if isinstance(value, unicode):
- return value + unicode(merge_with)
+ if isinstance(value, six.text_type):
+ return value + six.text_type(merge_with)
else:
- return value + str(merge_with)
+ return value + six.binary_type(merge_with)
diff --git a/cloudinit/netinfo.py b/cloudinit/netinfo.py
index fb40cc0d..e30d6fb5 100644
--- a/cloudinit/netinfo.py
+++ b/cloudinit/netinfo.py
@@ -87,7 +87,7 @@ def netdev_info(empty=""):
devs[curdev][target] = toks[i][len(field) + 1:]
if empty != "":
- for (_devname, dev) in devs.iteritems():
+ for (_devname, dev) in devs.items():
for field in dev:
if dev[field] == "":
dev[field] = empty
@@ -181,7 +181,7 @@ def netdev_pformat():
else:
fields = ['Device', 'Up', 'Address', 'Mask', 'Scope', 'Hw-Address']
tbl = PrettyTable(fields)
- for (dev, d) in netdev.iteritems():
+ for (dev, d) in netdev.items():
tbl.add_row([dev, d["up"], d["addr"], d["mask"], ".", d["hwaddr"]])
if d.get('addr6'):
tbl.add_row([dev, d["up"],
diff --git a/cloudinit/signal_handler.py b/cloudinit/signal_handler.py
index 40b0c94c..0d95f506 100644
--- a/cloudinit/signal_handler.py
+++ b/cloudinit/signal_handler.py
@@ -22,7 +22,7 @@ import inspect
import signal
import sys
-from StringIO import StringIO
+from six import StringIO
from cloudinit import log as logging
from cloudinit import util
diff --git a/cloudinit/sources/DataSourceAltCloud.py b/cloudinit/sources/DataSourceAltCloud.py
index 1b0f72a1..fb528ae5 100644
--- a/cloudinit/sources/DataSourceAltCloud.py
+++ b/cloudinit/sources/DataSourceAltCloud.py
@@ -200,11 +200,11 @@ class DataSourceAltCloud(sources.DataSource):
cmd = CMD_PROBE_FLOPPY
(cmd_out, _err) = util.subp(cmd)
LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out))
- except ProcessExecutionError, _err:
+ except ProcessExecutionError as _err:
util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd),
_err.message)
return False
- except OSError, _err:
+ except OSError as _err:
util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd),
_err.message)
return False
@@ -217,11 +217,11 @@ class DataSourceAltCloud(sources.DataSource):
cmd.append('--exit-if-exists=' + floppy_dev)
(cmd_out, _err) = util.subp(cmd)
LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out))
- except ProcessExecutionError, _err:
+ except ProcessExecutionError as _err:
util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd),
_err.message)
return False
- except OSError, _err:
+ except OSError as _err:
util.logexc(LOG, 'Failed command: %s\n%s', ' '.join(cmd),
_err.message)
return False
diff --git a/cloudinit/sources/DataSourceAzure.py b/cloudinit/sources/DataSourceAzure.py
index 09bc196d..c599d50f 100644
--- a/cloudinit/sources/DataSourceAzure.py
+++ b/cloudinit/sources/DataSourceAzure.py
@@ -124,7 +124,8 @@ class DataSourceAzureNet(sources.DataSource):
LOG.debug("using files cached in %s", ddir)
# azure / hyper-v provides random data here
- seed = util.load_file("/sys/firmware/acpi/tables/OEM0", quiet=True)
+ seed = util.load_file("/sys/firmware/acpi/tables/OEM0",
+ quiet=True, decode=False)
if seed:
self.metadata['random_seed'] = seed
@@ -151,7 +152,7 @@ class DataSourceAzureNet(sources.DataSource):
# walinux agent writes files world readable, but expects
# the directory to be protected.
- write_files(ddir, files, dirmode=0700)
+ write_files(ddir, files, dirmode=0o700)
# handle the hostname 'publishing'
try:
@@ -390,7 +391,7 @@ def write_files(datadir, files, dirmode=None):
util.ensure_dir(datadir, dirmode)
for (name, content) in files.items():
util.write_file(filename=os.path.join(datadir, name),
- content=content, mode=0600)
+ content=content, mode=0o600)
def invoke_agent(cmd):
diff --git a/cloudinit/sources/DataSourceConfigDrive.py b/cloudinit/sources/DataSourceConfigDrive.py
index 15244a0d..eb474079 100644
--- a/cloudinit/sources/DataSourceConfigDrive.py
+++ b/cloudinit/sources/DataSourceConfigDrive.py
@@ -216,11 +216,11 @@ def on_first_boot(data, distro=None):
files = data.get('files', {})
if files:
LOG.debug("Writing %s injected files", len(files))
- for (filename, content) in files.iteritems():
+ for (filename, content) in files.items():
if not filename.startswith(os.sep):
filename = os.sep + filename
try:
- util.write_file(filename, content, mode=0660)
+ util.write_file(filename, content, mode=0o660)
except IOError:
util.logexc(LOG, "Failed writing file: %s", filename)
diff --git a/cloudinit/sources/DataSourceDigitalOcean.py b/cloudinit/sources/DataSourceDigitalOcean.py
index 8f27ee89..b20ce2a1 100644
--- a/cloudinit/sources/DataSourceDigitalOcean.py
+++ b/cloudinit/sources/DataSourceDigitalOcean.py
@@ -18,7 +18,7 @@ from cloudinit import log as logging
from cloudinit import util
from cloudinit import sources
from cloudinit import ec2_utils
-from types import StringType
+
import functools
@@ -72,10 +72,11 @@ class DataSourceDigitalOcean(sources.DataSource):
return "\n".join(self.metadata['vendor-data'])
def get_public_ssh_keys(self):
- if type(self.metadata['public-keys']) is StringType:
- return [self.metadata['public-keys']]
+ public_keys = self.metadata['public-keys']
+ if isinstance(public_keys, list):
+ return public_keys
else:
- return self.metadata['public-keys']
+ return [public_keys]
@property
def availability_zone(self):
diff --git a/cloudinit/sources/DataSourceEc2.py b/cloudinit/sources/DataSourceEc2.py
index 1b20ecf3..798869b7 100644
--- a/cloudinit/sources/DataSourceEc2.py
+++ b/cloudinit/sources/DataSourceEc2.py
@@ -156,8 +156,8 @@ class DataSourceEc2(sources.DataSource):
# 'ephemeral0': '/dev/sdb',
# 'root': '/dev/sda1'}
found = None
- bdm_items = self.metadata['block-device-mapping'].iteritems()
- for (entname, device) in bdm_items:
+ bdm = self.metadata['block-device-mapping']
+ for (entname, device) in bdm.items():
if entname == name:
found = device
break
diff --git a/cloudinit/sources/DataSourceMAAS.py b/cloudinit/sources/DataSourceMAAS.py
index dfe90bc6..082cc58f 100644
--- a/cloudinit/sources/DataSourceMAAS.py
+++ b/cloudinit/sources/DataSourceMAAS.py
@@ -18,12 +18,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
+
from email.utils import parsedate
import errno
-import oauth.oauth as oauth
+import oauthlib
import os
import time
-import urllib2
+
+from six.moves.urllib_request import Request, urlopen
from cloudinit import log as logging
from cloudinit import sources
@@ -262,7 +265,7 @@ def check_seed_contents(content, seed):
userdata = content.get('user-data', "")
md = {}
- for (key, val) in content.iteritems():
+ for (key, val) in content.items():
if key == 'user-data':
continue
md[key] = val
@@ -272,25 +275,14 @@ def check_seed_contents(content, seed):
def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret,
timestamp=None):
- consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
- token = oauth.OAuthToken(token_key, token_secret)
-
- if timestamp is None:
- ts = int(time.time())
- else:
- ts = timestamp
-
- params = {
- 'oauth_version': "1.0",
- 'oauth_nonce': oauth.generate_nonce(),
- 'oauth_timestamp': ts,
- 'oauth_token': token.key,
- 'oauth_consumer_key': consumer.key,
- }
- req = oauth.OAuthRequest(http_url=url, parameters=params)
- req.sign_request(oauth.OAuthSignatureMethod_PLAINTEXT(),
- consumer, token)
- return req.to_header()
+ client = oauthlib.oauth1.Client(
+ consumer_key,
+ client_secret=consumer_secret,
+ resource_owner_key=token_key,
+ resource_owner_secret=token_secret,
+ signature_method=oauthlib.SIGNATURE_PLAINTEXT)
+ uri, signed_headers, body = client.sign(url)
+ return signed_headers
class MAASSeedDirNone(Exception):
@@ -357,11 +349,11 @@ if __name__ == "__main__":
creds[key] = cfg[key]
def geturl(url, headers_cb):
- req = urllib2.Request(url, data=None, headers=headers_cb(url))
- return (urllib2.urlopen(req).read())
+ req = Request(url, data=None, headers=headers_cb(url))
+ return urlopen(req).read()
def printurl(url, headers_cb):
- print "== %s ==\n%s\n" % (url, geturl(url, headers_cb))
+ print("== %s ==\n%s\n" % (url, geturl(url, headers_cb)))
def crawl(url, headers_cb=None):
if url.endswith("/"):
@@ -386,9 +378,9 @@ if __name__ == "__main__":
version=args.apiver)
else:
(userdata, metadata) = read_maas_seed_url(args.url)
- print "=== userdata ==="
- print userdata
- print "=== metadata ==="
+ print("=== userdata ===")
+ print(userdata)
+ print("=== metadata ===")
pprint.pprint(metadata)
elif args.subcmd == "get":
diff --git a/cloudinit/sources/DataSourceOVF.py b/cloudinit/sources/DataSourceOVF.py
index 7ba60735..58a4b2a2 100644
--- a/cloudinit/sources/DataSourceOVF.py
+++ b/cloudinit/sources/DataSourceOVF.py
@@ -66,7 +66,7 @@ class DataSourceOVF(sources.DataSource):
np = {'iso': transport_iso9660,
'vmware-guestd': transport_vmware_guestd, }
name = None
- for (name, transfunc) in np.iteritems():
+ for (name, transfunc) in np.items():
(contents, _dev, _fname) = transfunc()
if contents:
break
@@ -138,7 +138,7 @@ def read_ovf_environment(contents):
ud = ""
cfg_props = ['password']
md_props = ['seedfrom', 'local-hostname', 'public-keys', 'instance-id']
- for (prop, val) in props.iteritems():
+ for (prop, val) in props.items():
if prop == 'hostname':
prop = "local-hostname"
if prop in md_props:
@@ -183,7 +183,7 @@ def transport_iso9660(require_iso=True):
# Go through mounts to see if it was already mounted
mounts = util.mounts()
- for (dev, info) in mounts.iteritems():
+ for (dev, info) in mounts.items():
fstype = info['fstype']
if fstype != "iso9660" and require_iso:
continue
diff --git a/cloudinit/sources/DataSourceOpenNebula.py b/cloudinit/sources/DataSourceOpenNebula.py
index e2469f6e..61709c1b 100644
--- a/cloudinit/sources/DataSourceOpenNebula.py
+++ b/cloudinit/sources/DataSourceOpenNebula.py
@@ -34,6 +34,7 @@ from cloudinit import log as logging
from cloudinit import sources
from cloudinit import util
+
LOG = logging.getLogger(__name__)
DEFAULT_IID = "iid-dsopennebula"
@@ -280,7 +281,7 @@ def parse_shell_config(content, keylist=None, bash=None, asuser=None,
# allvars expands to all existing variables by using '${!x*}' notation
# where x is lower or upper case letters or '_'
- allvars = ["${!%s*}" % x for x in string.letters + "_"]
+ allvars = ["${!%s*}" % x for x in string.ascii_letters + "_"]
keylist_in = keylist
if keylist is None:
@@ -379,9 +380,8 @@ def read_context_disk_dir(source_dir, asuser=None):
raise BrokenContextDiskDir("configured user '%s' "
"does not exist", asuser)
try:
- with open(os.path.join(source_dir, 'context.sh'), 'r') as f:
- content = f.read().strip()
-
+ path = os.path.join(source_dir, 'context.sh')
+ content = util.load_file(path)
context = parse_shell_config(content, asuser=asuser)
except util.ProcessExecutionError as e:
raise BrokenContextDiskDir("Error processing context.sh: %s" % (e))
@@ -426,14 +426,14 @@ def read_context_disk_dir(source_dir, asuser=None):
context.get('USER_DATA_ENCODING'))
if encoding == "base64":
try:
- results['userdata'] = base64.b64decode(results['userdata'])
+ results['userdata'] = util.b64d(results['userdata'])
except TypeError:
LOG.warn("Failed base64 decoding of userdata")
# generate static /etc/network/interfaces
# only if there are any required context variables
# http://opennebula.org/documentation:rel3.8:cong#network_configuration
- for k in context.keys():
+ for k in context:
if re.match(r'^ETH\d+_IP$', k):
(out, _) = util.subp(['/sbin/ip', 'link'])
net = OpenNebulaNetwork(out, context)
diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py
index 86b8775a..9d48beab 100644
--- a/cloudinit/sources/DataSourceSmartOS.py
+++ b/cloudinit/sources/DataSourceSmartOS.py
@@ -30,12 +30,13 @@
# Comments with "@datadictionary" are snippets of the definition
import base64
+import binascii
+import os
+import serial
+
from cloudinit import log as logging
from cloudinit import sources
from cloudinit import util
-import os
-import os.path
-import serial
LOG = logging.getLogger(__name__)
@@ -201,7 +202,7 @@ class DataSourceSmartOS(sources.DataSource):
if b64_all is not None:
self.b64_all = util.is_true(b64_all)
- for ci_noun, attribute in SMARTOS_ATTRIB_MAP.iteritems():
+ for ci_noun, attribute in SMARTOS_ATTRIB_MAP.items():
smartos_noun, strip = attribute
md[ci_noun] = self.query(smartos_noun, strip=strip)
@@ -218,11 +219,12 @@ class DataSourceSmartOS(sources.DataSource):
user_script = os.path.join(data_d, 'user-script')
u_script_l = "%s/user-script" % LEGACY_USER_D
write_boot_content(md.get('user-script'), content_f=user_script,
- link=u_script_l, shebang=True, mode=0700)
+ link=u_script_l, shebang=True, mode=0o700)
operator_script = os.path.join(data_d, 'operator-script')
write_boot_content(md.get('operator-script'),
- content_f=operator_script, shebang=False, mode=0700)
+ content_f=operator_script, shebang=False,
+ mode=0o700)
# @datadictionary: This key has no defined format, but its value
# is written to the file /var/db/mdata-user-data on each boot prior
@@ -349,8 +351,9 @@ def query_data(noun, seed_device, seed_timeout, strip=False, default=None,
if b64:
try:
- return base64.b64decode(resp)
- except TypeError:
+ return util.b64d(resp)
+ # Bogus input produces different errors in Python 2 and 3; catch both.
+ except (TypeError, binascii.Error):
LOG.warn("Failed base64 decoding key '%s'", noun)
return resp
@@ -368,7 +371,7 @@ def dmi_data():
def write_boot_content(content, content_f, link=None, shebang=False,
- mode=0400):
+ mode=0o400):
"""
Write the content to content_f. Under the following rules:
1. If no content, remove the file
diff --git a/cloudinit/sources/__init__.py b/cloudinit/sources/__init__.py
index 7c7ef9ab..39eab51b 100644
--- a/cloudinit/sources/__init__.py
+++ b/cloudinit/sources/__init__.py
@@ -23,6 +23,8 @@
import abc
import os
+import six
+
from cloudinit import importer
from cloudinit import log as logging
from cloudinit import type_utils
@@ -130,7 +132,7 @@ class DataSource(object):
# we want to return the correct value for what will actually
# exist in this instance
mappings = {"sd": ("vd", "xvd", "vtb")}
- for (nfrom, tlist) in mappings.iteritems():
+ for (nfrom, tlist) in mappings.items():
if not short_name.startswith(nfrom):
continue
for nto in tlist:
@@ -218,18 +220,18 @@ def normalize_pubkey_data(pubkey_data):
if not pubkey_data:
return keys
- if isinstance(pubkey_data, (basestring, str)):
+ if isinstance(pubkey_data, six.string_types):
return str(pubkey_data).splitlines()
if isinstance(pubkey_data, (list, set)):
return list(pubkey_data)
if isinstance(pubkey_data, (dict)):
- for (_keyname, klist) in pubkey_data.iteritems():
+ for (_keyname, klist) in pubkey_data.items():
# lp:506332 uec metadata service responds with
# data that makes boto populate a string for 'klist' rather
# than a list.
- if isinstance(klist, (str, basestring)):
+ if isinstance(klist, six.string_types):
klist = [klist]
if isinstance(klist, (list, set)):
for pkey in klist:
diff --git a/cloudinit/sources/helpers/openstack.py b/cloudinit/sources/helpers/openstack.py
index b7e19314..88c7a198 100644
--- a/cloudinit/sources/helpers/openstack.py
+++ b/cloudinit/sources/helpers/openstack.py
@@ -24,6 +24,8 @@ import copy
import functools
import os
+import six
+
from cloudinit import ec2_utils
from cloudinit import log as logging
from cloudinit import sources
@@ -205,7 +207,7 @@ class BaseReader(object):
"""
load_json_anytype = functools.partial(
- util.load_json, root_types=(dict, basestring, list))
+ util.load_json, root_types=(dict, list) + six.string_types)
def datafiles(version):
files = {}
@@ -234,7 +236,7 @@ class BaseReader(object):
'version': 2,
}
data = datafiles(self._find_working_version())
- for (name, (path, required, translator)) in data.iteritems():
+ for (name, (path, required, translator)) in data.items():
path = self._path_join(self.base_path, path)
data = None
found = False
@@ -364,7 +366,7 @@ class ConfigDriveReader(BaseReader):
raise NonReadable("%s: no files found" % (self.base_path))
md = {}
- for (name, (key, translator, default)) in FILES_V1.iteritems():
+ for (name, (key, translator, default)) in FILES_V1.items():
if name in found:
path = found[name]
try:
@@ -478,7 +480,7 @@ def convert_vendordata_json(data, recurse=True):
"""
if not data:
return None
- if isinstance(data, (str, unicode, basestring)):
+ if isinstance(data, six.string_types):
return data
if isinstance(data, list):
return copy.deepcopy(data)
diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py
index 14d0cb0f..9b2f5ed5 100644
--- a/cloudinit/ssh_util.py
+++ b/cloudinit/ssh_util.py
@@ -239,7 +239,7 @@ def setup_user_keys(keys, username, options=None):
# Make sure the users .ssh dir is setup accordingly
(ssh_dir, pwent) = users_ssh_info(username)
if not os.path.isdir(ssh_dir):
- util.ensure_dir(ssh_dir, mode=0700)
+ util.ensure_dir(ssh_dir, mode=0o700)
util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid)
# Turn the 'update' keys given into actual entries
@@ -252,8 +252,8 @@ def setup_user_keys(keys, username, options=None):
(auth_key_fn, auth_key_entries) = extract_authorized_keys(username)
with util.SeLinuxGuard(ssh_dir, recursive=True):
content = update_authorized_keys(auth_key_entries, key_entries)
- util.ensure_dir(os.path.dirname(auth_key_fn), mode=0700)
- util.write_file(auth_key_fn, content, mode=0600)
+ util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700)
+ util.write_file(auth_key_fn, content, mode=0o600)
util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid)
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index 67f467f7..f4f4591d 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -20,12 +20,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import cPickle as pickle
-
import copy
import os
import sys
+import six
+from six.moves import cPickle as pickle
+
from cloudinit.settings import (PER_INSTANCE, FREQUENCIES, CLOUD_CONFIG)
from cloudinit import handlers
@@ -202,7 +203,7 @@ class Init(object):
util.logexc(LOG, "Failed pickling datasource %s", self.datasource)
return False
try:
- util.write_file(pickled_fn, pk_contents, mode=0400)
+ util.write_file(pickled_fn, pk_contents, mode=0o400)
except Exception:
util.logexc(LOG, "Failed pickling datasource to %s", pickled_fn)
return False
@@ -324,15 +325,15 @@ class Init(object):
def _store_userdata(self):
raw_ud = "%s" % (self.datasource.get_userdata_raw())
- util.write_file(self._get_ipath('userdata_raw'), raw_ud, 0600)
+ util.write_file(self._get_ipath('userdata_raw'), raw_ud, 0o600)
processed_ud = "%s" % (self.datasource.get_userdata())
- util.write_file(self._get_ipath('userdata'), processed_ud, 0600)
+ util.write_file(self._get_ipath('userdata'), processed_ud, 0o600)
def _store_vendordata(self):
raw_vd = "%s" % (self.datasource.get_vendordata_raw())
- util.write_file(self._get_ipath('vendordata_raw'), raw_vd, 0600)
+ util.write_file(self._get_ipath('vendordata_raw'), raw_vd, 0o600)
processed_vd = "%s" % (self.datasource.get_vendordata())
- util.write_file(self._get_ipath('vendordata'), processed_vd, 0600)
+ util.write_file(self._get_ipath('vendordata'), processed_vd, 0o600)
def _default_handlers(self, opts=None):
if opts is None:
@@ -384,7 +385,7 @@ class Init(object):
if not path or not os.path.isdir(path):
return
potential_handlers = util.find_modules(path)
- for (fname, mod_name) in potential_handlers.iteritems():
+ for (fname, mod_name) in potential_handlers.items():
try:
mod_locs, looked_locs = importer.find_module(
mod_name, [''], ['list_types', 'handle_part'])
@@ -422,7 +423,7 @@ class Init(object):
def init_handlers():
# Init the handlers first
- for (_ctype, mod) in c_handlers.iteritems():
+ for (_ctype, mod) in c_handlers.items():
if mod in c_handlers.initialized:
# Avoid initing the same module twice (if said module
# is registered to more than one content-type).
@@ -449,7 +450,7 @@ class Init(object):
def finalize_handlers():
# Give callbacks opportunity to finalize
- for (_ctype, mod) in c_handlers.iteritems():
+ for (_ctype, mod) in c_handlers.items():
if mod not in c_handlers.initialized:
# Said module was never inited in the first place, so lets
# not attempt to finalize those that never got called.
@@ -574,7 +575,7 @@ class Modules(object):
for item in cfg_mods:
if not item:
continue
- if isinstance(item, (str, basestring)):
+ if isinstance(item, six.string_types):
module_list.append({
'mod': item.strip(),
})
diff --git a/cloudinit/templater.py b/cloudinit/templater.py
index 4cd3f13d..a9231482 100644
--- a/cloudinit/templater.py
+++ b/cloudinit/templater.py
@@ -137,7 +137,7 @@ def render_from_file(fn, params):
return renderer(content, params)
-def render_to_file(fn, outfn, params, mode=0644):
+def render_to_file(fn, outfn, params, mode=0o644):
contents = render_from_file(fn, params)
util.write_file(outfn, contents, mode=mode)
diff --git a/cloudinit/type_utils.py b/cloudinit/type_utils.py
index cc3d9495..b93efd6a 100644
--- a/cloudinit/type_utils.py
+++ b/cloudinit/type_utils.py
@@ -22,11 +22,31 @@
import types
+import six
+
+
+if six.PY3:
+ _NAME_TYPES = (
+ types.ModuleType,
+ types.FunctionType,
+ types.LambdaType,
+ type,
+ )
+else:
+ _NAME_TYPES = (
+ types.TypeType,
+ types.ModuleType,
+ types.FunctionType,
+ types.LambdaType,
+ types.ClassType,
+ )
+
def obj_name(obj):
- if isinstance(obj, (types.TypeType,
- types.ModuleType,
- types.FunctionType,
- types.LambdaType)):
- return str(obj.__name__)
- return obj_name(obj.__class__)
+ if isinstance(obj, _NAME_TYPES):
+ return six.text_type(obj.__name__)
+ else:
+ if not hasattr(obj, '__class__'):
+ return repr(obj)
+ else:
+ return obj_name(obj.__class__)
diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py
index 3074dd08..62001dff 100644
--- a/cloudinit/url_helper.py
+++ b/cloudinit/url_helper.py
@@ -20,21 +20,29 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import httplib
import time
-import urllib
+
+import six
import requests
from requests import exceptions
-from urlparse import (urlparse, urlunparse)
+from six.moves.urllib.parse import (
+ urlparse, urlunparse,
+ quote as urlquote)
from cloudinit import log as logging
from cloudinit import version
LOG = logging.getLogger(__name__)
-NOT_FOUND = httplib.NOT_FOUND
+if six.PY2:
+ import httplib
+ NOT_FOUND = httplib.NOT_FOUND
+else:
+ import http.client
+ NOT_FOUND = http.client.NOT_FOUND
+
# Check if requests has ssl support (added in requests >= 0.8.8)
SSL_ENABLED = False
@@ -70,7 +78,7 @@ def combine_url(base, *add_ons):
path = url_parsed[2]
if path and not path.endswith("/"):
path += "/"
- path += urllib.quote(str(add_on), safe="/:")
+ path += urlquote(str(add_on), safe="/:")
url_parsed[2] = path
return urlunparse(url_parsed)
@@ -111,7 +119,7 @@ class UrlResponse(object):
@property
def contents(self):
- return self._response.content
+ return self._response.text
@property
def url(self):
@@ -135,7 +143,7 @@ class UrlResponse(object):
return self._response.status_code
def __str__(self):
- return self.contents
+ return self._response.text
class UrlError(IOError):
diff --git a/cloudinit/user_data.py b/cloudinit/user_data.py
index de6487d8..fe343d0c 100644
--- a/cloudinit/user_data.py
+++ b/cloudinit/user_data.py
@@ -29,6 +29,8 @@ from email.mime.multipart import MIMEMultipart
from email.mime.nonmultipart import MIMENonMultipart
from email.mime.text import MIMEText
+import six
+
from cloudinit import handlers
from cloudinit import log as logging
from cloudinit import util
@@ -106,7 +108,7 @@ class UserDataProcessor(object):
ctype = None
ctype_orig = part.get_content_type()
- payload = part.get_payload(decode=True)
+ payload = util.fully_decoded_payload(part)
was_compressed = False
# When the message states it is of a gzipped content type ensure
@@ -235,7 +237,7 @@ class UserDataProcessor(object):
resp = util.read_file_or_url(include_url,
ssl_details=self.ssl_details)
if include_once_on and resp.ok():
- util.write_file(include_once_fn, str(resp), mode=0600)
+ util.write_file(include_once_fn, resp, mode=0o600)
if resp.ok():
content = str(resp)
else:
@@ -256,7 +258,7 @@ class UserDataProcessor(object):
# filename and type not be present
# or
# scalar(payload)
- if isinstance(ent, (str, basestring)):
+ if isinstance(ent, six.string_types):
ent = {'content': ent}
if not isinstance(ent, (dict)):
# TODO(harlowja) raise?
@@ -337,7 +339,7 @@ def convert_string(raw_data, headers=None):
data = util.decomp_gzip(raw_data)
if "mime-version:" in data[0:4096].lower():
msg = email.message_from_string(data)
- for (key, val) in headers.iteritems():
+ for (key, val) in headers.items():
_replace_header(msg, key, val)
else:
mtype = headers.get(CONTENT_TYPE, NOT_MULTIPART_TYPE)
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 26456aa6..b845adfd 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -20,8 +20,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from StringIO import StringIO
-
import contextlib
import copy as obj_copy
import ctypes
@@ -45,8 +43,11 @@ import subprocess
import sys
import tempfile
import time
-import urlparse
+from base64 import b64decode, b64encode
+from six.moves.urllib import parse as urlparse
+
+import six
import yaml
from cloudinit import importer
@@ -69,8 +70,60 @@ FN_REPLACEMENTS = {
}
FN_ALLOWED = ('_-.()' + string.digits + string.ascii_letters)
+TRUE_STRINGS = ('true', '1', 'on', 'yes')
+FALSE_STRINGS = ('off', '0', 'no', 'false')
+
+
# Helper utils to see if running in a container
-CONTAINER_TESTS = ['running-in-container', 'lxc-is-container']
+CONTAINER_TESTS = ('running-in-container', 'lxc-is-container')
+
+
+def decode_binary(blob, encoding='utf-8'):
+ # Converts a binary type into a text type using given encoding.
+ if isinstance(blob, six.text_type):
+ return blob
+ return blob.decode(encoding)
+
+
+def encode_text(text, encoding='utf-8'):
+ # Converts a text string into a binary type using given encoding.
+ if isinstance(text, six.binary_type):
+ return text
+ return text.encode(encoding)
+
+
+def b64d(source):
+ # Base64 decode some data, accepting bytes or unicode/str, and returning
+ # str/unicode if the result is utf-8 compatible, otherwise returning bytes.
+ decoded = b64decode(source)
+ try:
+ return decoded.decode('utf-8')
+ except UnicodeDecodeError:
+ return decoded
+
+
+def b64e(source):
+ # Base64 encode some data, accepting bytes or unicode/str, and returning
+ # str/unicode if the result is utf-8 compatible, otherwise returning bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return b64encode(source).decode('utf-8')
+
+
+def fully_decoded_payload(part):
+ # In Python 3, decoding the payload will ironically hand us a bytes object.
+ # 'decode' means to decode according to Content-Transfer-Encoding, not
+ # according to any charset in the Content-Type. So, if we end up with
+ # bytes, first try to decode to str via CT charset, and failing that, try
+ # utf-8 using surrogate escapes.
+ cte_payload = part.get_payload(decode=True)
+ if (six.PY3 and
+ part.get_content_maintype() == 'text' and
+ isinstance(cte_payload, bytes)):
+ charset = part.get_charset() or 'utf-8'
+ return cte_payload.decode(charset, errors='surrogateescape')
+ return cte_payload
+
# Path for DMI Data
DMI_SYS_PATH = "/sys/class/dmi/id"
@@ -98,7 +151,7 @@ class ProcessExecutionError(IOError):
else:
self.description = description
- if not isinstance(exit_code, (long, int)):
+ if not isinstance(exit_code, six.integer_types):
self.exit_code = '-'
else:
self.exit_code = exit_code
@@ -127,6 +180,9 @@ class ProcessExecutionError(IOError):
'reason': self.reason,
}
IOError.__init__(self, message)
+ # For backward compatibility with Python 2.
+ if not hasattr(self, 'message'):
+ self.message = message
class SeLinuxGuard(object):
@@ -154,7 +210,8 @@ class SeLinuxGuard(object):
path = os.path.realpath(self.path)
# path should be a string, not unicode
- path = str(path)
+ if six.PY2:
+ path = str(path)
try:
stats = os.lstat(path)
self.selinux.matchpathcon(path, stats[stat.ST_MODE])
@@ -212,10 +269,10 @@ def fork_cb(child_cb, *args, **kwargs):
def is_true(val, addons=None):
if isinstance(val, (bool)):
return val is True
- check_set = ['true', '1', 'on', 'yes']
+ check_set = TRUE_STRINGS
if addons:
- check_set = check_set + addons
- if str(val).lower().strip() in check_set:
+ check_set = list(check_set) + addons
+ if six.text_type(val).lower().strip() in check_set:
return True
return False
@@ -223,10 +280,10 @@ def is_true(val, addons=None):
def is_false(val, addons=None):
if isinstance(val, (bool)):
return val is False
- check_set = ['off', '0', 'no', 'false']
+ check_set = FALSE_STRINGS
if addons:
- check_set = check_set + addons
- if str(val).lower().strip() in check_set:
+ check_set = list(check_set) + addons
+ if six.text_type(val).lower().strip() in check_set:
return True
return False
@@ -244,7 +301,7 @@ def translate_bool(val, addons=None):
def rand_str(strlen=32, select_from=None):
if not select_from:
- select_from = string.letters + string.digits
+ select_from = string.ascii_letters + string.digits
return "".join([random.choice(select_from) for _x in range(0, strlen)])
@@ -276,7 +333,7 @@ def uniq_merge_sorted(*lists):
def uniq_merge(*lists):
combined_list = []
for a_list in lists:
- if isinstance(a_list, (str, basestring)):
+ if isinstance(a_list, six.string_types):
a_list = a_list.strip().split(",")
# Kickout the empty ones
a_list = [a for a in a_list if len(a)]
@@ -285,7 +342,7 @@ def uniq_merge(*lists):
def clean_filename(fn):
- for (k, v) in FN_REPLACEMENTS.iteritems():
+ for (k, v) in FN_REPLACEMENTS.items():
fn = fn.replace(k, v)
removals = []
for k in fn:
@@ -297,16 +354,19 @@ def clean_filename(fn):
return fn
-def decomp_gzip(data, quiet=True):
+def decomp_gzip(data, quiet=True, decode=True):
try:
- buf = StringIO(str(data))
+ buf = six.BytesIO(encode_text(data))
with contextlib.closing(gzip.GzipFile(None, "rb", 1, buf)) as gh:
- return gh.read()
+ if decode:
+ return decode_binary(gh.read())
+ else:
+ return gh.read()
except Exception as e:
if quiet:
return data
else:
- raise DecompressionError(str(e))
+ raise DecompressionError(six.text_type(e))
def extract_usergroup(ug_pair):
@@ -365,7 +425,7 @@ def multi_log(text, console=True, stderr=True,
def load_json(text, root_types=(dict,)):
- decoded = json.loads(text)
+ decoded = json.loads(decode_binary(text))
if not isinstance(decoded, tuple(root_types)):
expected_types = ", ".join([str(t) for t in root_types])
raise TypeError("(%s) root types expected, got %s instead"
@@ -397,7 +457,7 @@ def get_cfg_option_str(yobj, key, default=None):
if key not in yobj:
return default
val = yobj[key]
- if not isinstance(val, (str, basestring)):
+ if not isinstance(val, six.string_types):
val = str(val)
return val
@@ -436,7 +496,7 @@ def get_cfg_option_list(yobj, key, default=None):
if isinstance(val, (list)):
cval = [v for v in val]
return cval
- if not isinstance(val, (basestring)):
+ if not isinstance(val, six.string_types):
val = str(val)
return [val]
@@ -711,10 +771,10 @@ def read_file_or_url(url, timeout=5, retries=10,
def load_yaml(blob, default=None, allowed=(dict,)):
loaded = default
+ blob = decode_binary(blob)
try:
- blob = str(blob)
- LOG.debug(("Attempting to load yaml from string "
- "of length %s with allowed root types %s"),
+ LOG.debug("Attempting to load yaml from string "
+ "of length %s with allowed root types %s",
len(blob), allowed)
converted = safeyaml.load(blob)
if not isinstance(converted, allowed):
@@ -749,14 +809,12 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0):
md_resp = read_file_or_url(md_url, timeout, retries, file_retries)
md = None
if md_resp.ok():
- md_str = str(md_resp)
- md = load_yaml(md_str, default={})
+ md = load_yaml(md_resp.contents, default={})
ud_resp = read_file_or_url(ud_url, timeout, retries, file_retries)
ud = None
if ud_resp.ok():
- ud_str = str(ud_resp)
- ud = ud_str
+ ud = ud_resp.contents
return (md, ud)
@@ -787,7 +845,7 @@ def read_conf_with_confd(cfgfile):
if "conf_d" in cfg:
confd = cfg['conf_d']
if confd:
- if not isinstance(confd, (str, basestring)):
+ if not isinstance(confd, six.string_types):
raise TypeError(("Config file %s contains 'conf_d' "
"with non-string type %s") %
(cfgfile, type_utils.obj_name(confd)))
@@ -924,8 +982,8 @@ def get_cmdline_url(names=('cloud-config-url', 'url'),
return (None, None, None)
resp = read_file_or_url(url)
- if resp.contents.startswith(starts) and resp.ok():
- return (key, url, str(resp))
+ if resp.ok() and resp.contents.startswith(starts):
+ return (key, url, resp.contents)
return (key, url, None)
@@ -1079,9 +1137,9 @@ def uniq_list(in_list):
return out_list
-def load_file(fname, read_cb=None, quiet=False):
+def load_file(fname, read_cb=None, quiet=False, decode=True):
LOG.debug("Reading from %s (quiet=%s)", fname, quiet)
- ofh = StringIO()
+ ofh = six.BytesIO()
try:
with open(fname, 'rb') as ifh:
pipe_in_out(ifh, ofh, chunk_cb=read_cb)
@@ -1092,7 +1150,10 @@ def load_file(fname, read_cb=None, quiet=False):
raise
contents = ofh.getvalue()
LOG.debug("Read %s bytes from %s", len(contents), fname)
- return contents
+ if decode:
+ return decode_binary(contents)
+ else:
+ return contents
def get_cmdline():
@@ -1110,7 +1171,7 @@ def pipe_in_out(in_fh, out_fh, chunk_size=1024, chunk_cb=None):
bytes_piped = 0
while True:
data = in_fh.read(chunk_size)
- if data == '':
+ if len(data) == 0:
break
else:
out_fh.write(data)
@@ -1216,13 +1277,20 @@ def logexc(log, msg, *args):
# coming out to a non-debug stream
if msg:
log.warn(msg, *args)
- # Debug gets the full trace
- log.debug(msg, exc_info=1, *args)
+ # Debug gets the full trace. However, nose has a bug whereby its
+ # logcapture plugin doesn't properly handle the case where there is no
+ # actual exception. To avoid tracebacks during the test suite then, we'll
+ # do the actual exc_info extraction here, and if there is no exception in
+ # flight, we'll just pass in None.
+ exc_info = sys.exc_info()
+ if exc_info == (None, None, None):
+ exc_info = None
+ log.debug(msg, exc_info=exc_info, *args)
def hash_blob(blob, routine, mlen=None):
hasher = hashlib.new(routine)
- hasher.update(blob)
+ hasher.update(encode_text(blob))
digest = hasher.hexdigest()
# Don't get to long now
if mlen is not None:
@@ -1253,7 +1321,7 @@ def rename(src, dest):
os.rename(src, dest)
-def ensure_dirs(dirlist, mode=0755):
+def ensure_dirs(dirlist, mode=0o755):
for d in dirlist:
ensure_dir(d, mode)
@@ -1267,7 +1335,7 @@ def read_write_cmdline_url(target_fn):
return
try:
if key and content:
- write_file(target_fn, content, mode=0600)
+ write_file(target_fn, content, mode=0o600)
LOG.debug(("Wrote to %s with contents of command line"
" url %s (len=%s)"), target_fn, url, len(content))
elif key and not content:
@@ -1283,8 +1351,7 @@ def yaml_dumps(obj, explicit_start=True, explicit_end=True):
indent=4,
explicit_start=explicit_start,
explicit_end=explicit_end,
- default_flow_style=False,
- allow_unicode=True)
+ default_flow_style=False)
def ensure_dir(path, mode=None):
@@ -1492,7 +1559,7 @@ def append_file(path, content):
write_file(path, content, omode="ab", mode=None)
-def ensure_file(path, mode=0644):
+def ensure_file(path, mode=0o644):
write_file(path, content='', omode="ab", mode=mode)
@@ -1510,7 +1577,7 @@ def chmod(path, mode):
os.chmod(path, real_mode)
-def write_file(filename, content, mode=0644, omode="wb"):
+def write_file(filename, content, mode=0o644, omode="wb"):
"""
Writes a file with the given content and sets the file mode as specified.
Resotres the SELinux context if possible.
@@ -1518,11 +1585,17 @@ def write_file(filename, content, mode=0644, omode="wb"):
@param filename: The full path of the file to write.
@param content: The content to write to the file.
@param mode: The filesystem mode to set on the file.
- @param omode: The open mode used when opening the file (r, rb, a, etc.)
+ @param omode: The open mode used when opening the file (w, wb, a, etc.)
"""
ensure_dir(os.path.dirname(filename))
- LOG.debug("Writing to %s - %s: [%s] %s bytes",
- filename, omode, mode, len(content))
+ if 'b' in omode.lower():
+ content = encode_text(content)
+ write_type = 'bytes'
+ else:
+ content = decode_binary(content)
+ write_type = 'characters'
+ LOG.debug("Writing to %s - %s: [%s] %s %s",
+ filename, omode, mode, len(content), write_type)
with SeLinuxGuard(path=filename):
with open(filename, omode) as fh:
fh.write(content)
@@ -1564,9 +1637,12 @@ def subp(args, data=None, rcs=None, env=None, capture=True, shell=False,
stdout = subprocess.PIPE
stderr = subprocess.PIPE
stdin = subprocess.PIPE
- sp = subprocess.Popen(args, stdout=stdout,
- stderr=stderr, stdin=stdin,
- env=env, shell=shell)
+ kws = dict(stdout=stdout, stderr=stderr, stdin=stdin,
+ env=env, shell=shell)
+ if six.PY3:
+ # Use this so subprocess output will be (Python 3) str, not bytes.
+ kws['universal_newlines'] = True
+ sp = subprocess.Popen(args, **kws)
(out, err) = sp.communicate(data)
except OSError as e:
raise ProcessExecutionError(cmd=args, reason=e)
@@ -1611,10 +1687,10 @@ def shellify(cmdlist, add_header=True):
if isinstance(args, list):
fixed = []
for f in args:
- fixed.append("'%s'" % (str(f).replace("'", escaped)))
+ fixed.append("'%s'" % (six.text_type(f).replace("'", escaped)))
content = "%s%s\n" % (content, ' '.join(fixed))
cmds_made += 1
- elif isinstance(args, (str, basestring)):
+ elif isinstance(args, six.string_types):
content = "%s%s\n" % (content, args)
cmds_made += 1
else:
@@ -1725,7 +1801,7 @@ def expand_package_list(version_fmt, pkgs):
pkglist = []
for pkg in pkgs:
- if isinstance(pkg, basestring):
+ if isinstance(pkg, six.string_types):
pkglist.append(pkg)
continue
@@ -2021,23 +2097,23 @@ def _read_dmi_syspath(key):
Reads dmi data with from /sys/class/dmi/id
"""
- dmi_key = "{}/{}".format(DMI_SYS_PATH, key)
- LOG.debug("querying dmi data {}".format(dmi_key))
+ dmi_key = "{0}/{1}".format(DMI_SYS_PATH, key)
+ LOG.debug("querying dmi data {0}".format(dmi_key))
try:
if not os.path.exists(dmi_key):
- LOG.debug("did not find {}".format(dmi_key))
+ LOG.debug("did not find {0}".format(dmi_key))
return None
key_data = load_file(dmi_key)
if not key_data:
- LOG.debug("{} did not return any data".format(key))
+ LOG.debug("{0} did not return any data".format(key))
return None
- LOG.debug("dmi data {} returned {}".format(dmi_key, key_data))
+ LOG.debug("dmi data {0} returned {0}".format(dmi_key, key_data))
return key_data.strip()
except Exception as e:
- logexc(LOG, "failed read of {}".format(dmi_key), e)
+ logexc(LOG, "failed read of {0}".format(dmi_key), e)
return None
@@ -2049,10 +2125,10 @@ def _call_dmidecode(key, dmidecode_path):
try:
cmd = [dmidecode_path, "--string", key]
(result, _err) = subp(cmd)
- LOG.debug("dmidecode returned '{}' for '{}'".format(result, key))
+ LOG.debug("dmidecode returned '{0}' for '{0}'".format(result, key))
return result
- except OSError, _err:
- LOG.debug('failed dmidecode cmd: {}\n{}'.format(cmd, _err.message))
+ except OSError as _err:
+ LOG.debug('failed dmidecode cmd: {0}\n{0}'.format(cmd, _err.message))
return None
@@ -2068,7 +2144,7 @@ def read_dmi_data(key):
if dmidecode_path:
return _call_dmidecode(key, dmidecode_path)
- LOG.warn("did not find either path {} or dmidecode command".format(
+ LOG.warn("did not find either path {0} or dmidecode command".format(
DMI_SYS_PATH))
return None
diff --git a/packages/bddeb b/packages/bddeb
index 9d264f92..c4efe264 100755
--- a/packages/bddeb
+++ b/packages/bddeb
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
import os
import shutil
@@ -27,22 +27,35 @@ import argparse
# Package names that will showup in requires to what we can actually
# use in our debian 'control' file, this is a translation of the 'requires'
# file pypi package name to a debian/ubuntu package name.
-PKG_MP = {
- 'argparse': 'python-argparse',
- 'cheetah': 'python-cheetah',
- 'configobj': 'python-configobj',
- 'jinja2': 'python-jinja2',
- 'jsonpatch': 'python-jsonpatch | python-json-patch',
- 'oauth': 'python-oauth',
- 'prettytable': 'python-prettytable',
- 'pyserial': 'python-serial',
- 'pyyaml': 'python-yaml',
- 'requests': 'python-requests',
+STD_NAMED_PACKAGES = [
+ 'configobj',
+ 'jinja2',
+ 'jsonpatch',
+ 'oauthlib',
+ 'prettytable',
+ 'requests',
+ 'six',
+ 'httpretty',
+ 'mock',
+ 'nose',
+ 'setuptools',
+]
+NONSTD_NAMED_PACKAGES = {
+ 'argparse': ('python-argparse', None),
+ 'contextlib2': ('python-contextlib2', None),
+ 'cheetah': ('python-cheetah', None),
+ 'pyserial': ('python-serial', 'python3-serial'),
+ 'pyyaml': ('python-yaml', 'python3-yaml'),
+ 'six': ('python-six', 'python3-six'),
+ 'pep8': ('pep8', 'python3-pep8'),
+ 'pyflakes': ('pyflakes', 'pyflakes'),
}
+
DEBUILD_ARGS = ["-S", "-d"]
-def write_debian_folder(root, version, revno, append_requires=[]):
+def write_debian_folder(root, version, revno, pkgmap,
+ pyver="3", append_requires=[]):
deb_dir = util.abs_join(root, 'debian')
os.makedirs(deb_dir)
@@ -58,25 +71,42 @@ def write_debian_folder(root, version, revno, append_requires=[]):
# Write out the control file template
cmd = [util.abs_join(find_root(), 'tools', 'read-dependencies')]
(stdout, _stderr) = util.subp(cmd)
- pkgs = [p.lower().strip() for p in stdout.splitlines()]
+ pypi_pkgs = [p.lower().strip() for p in stdout.splitlines()]
+
+ (stdout, _stderr) = util.subp(cmd + ['test-requirements.txt'])
+ pypi_test_pkgs = [p.lower().strip() for p in stdout.splitlines()]
# Map to known packages
requires = append_requires
- for p in pkgs:
- tgt_pkg = PKG_MP.get(p)
- if not tgt_pkg:
- raise RuntimeError(("Do not know how to translate pypi dependency"
- " %r to a known package") % (p))
- else:
- requires.append(tgt_pkg)
+ test_requires = []
+ lists = ((pypi_pkgs, requires), (pypi_test_pkgs, test_requires))
+ for pypilist, target in lists:
+ for p in pypilist:
+ if p not in pkgmap:
+ raise RuntimeError(("Do not know how to translate pypi "
+ "dependency %r to a known package") % (p))
+ elif pkgmap[p]:
+ target.append(pkgmap[p])
+
+ if pyver == "3":
+ python = "python3"
+ else:
+ python = "python"
templater.render_to_file(util.abs_join(find_root(),
'packages', 'debian', 'control.in'),
util.abs_join(deb_dir, 'control'),
- params={'requires': requires})
+ params={'requires': ','.join(requires),
+ 'test_requires': ','.join(test_requires),
+ 'python': python})
+
+ templater.render_to_file(util.abs_join(find_root(),
+ 'packages', 'debian', 'rules.in'),
+ util.abs_join(deb_dir, 'rules'),
+ params={'python': python, 'pyver': pyver})
# Just copy the following directly
- for base_fn in ['dirs', 'copyright', 'compat', 'rules']:
+ for base_fn in ['dirs', 'copyright', 'compat']:
shutil.copy(util.abs_join(find_root(),
'packages', 'debian', base_fn),
util.abs_join(deb_dir, base_fn))
@@ -90,12 +120,16 @@ def main():
" (default: %(default)s)"),
default=False,
action='store_true')
- parser.add_argument("--no-cloud-utils", dest="no_cloud_utils",
- help=("don't depend on cloud-utils package"
+ parser.add_argument("--cloud-utils", dest="cloud_utils",
+ help=("depend on cloud-utils package"
" (default: %(default)s)"),
default=False,
action='store_true')
+ parser.add_argument("--python2", dest="python2",
+ help=("build debs for python2 rather than python3"),
+ default=False, action='store_true')
+
parser.add_argument("--init-system", dest="init_system",
help=("build deb with INIT_SYSTEM=xxx"
" (default: %(default)s"),
@@ -122,6 +156,18 @@ def main():
if args.verbose:
capture = False
+ pkgmap = {}
+ for p in NONSTD_NAMED_PACKAGES:
+ pkgmap[p] = NONSTD_NAMED_PACKAGES[p][int(not args.python2)]
+
+ for p in STD_NAMED_PACKAGES:
+ if args.python2:
+ pkgmap[p] = "python-" + p
+ pyver = "2"
+ else:
+ pkgmap[p] = "python3-" + p
+ pyver = "3"
+
with util.tempdir() as tdir:
cmd = [util.abs_join(find_root(), 'tools', 'read-version')]
@@ -152,11 +198,12 @@ def main():
shutil.move(extracted_name, xdir)
print("Creating a debian/ folder in %r" % (xdir))
- if not args.no_cloud_utils:
+ if args.cloud_utils:
append_requires=['cloud-utils | cloud-guest-utils']
else:
append_requires=[]
- write_debian_folder(xdir, version, revno, append_requires)
+ write_debian_folder(xdir, version, revno, pkgmap,
+ pyver=pyver, append_requires=append_requires)
# The naming here seems to follow some debian standard
# so it will whine if it is changed...
diff --git a/packages/brpm b/packages/brpm
index 9657b1dd..72bfca08 100755
--- a/packages/brpm
+++ b/packages/brpm
@@ -45,6 +45,7 @@ PKG_MP = {
'pyserial': 'pyserial',
'pyyaml': 'PyYAML',
'requests': 'python-requests',
+ 'six': 'python-six',
},
'suse': {
'argparse': 'python-argparse',
@@ -56,6 +57,7 @@ PKG_MP = {
'pyserial': 'python-pyserial',
'pyyaml': 'python-yaml',
'requests': 'python-requests',
+ 'six': 'python-six',
}
}
diff --git a/packages/debian/changelog.in b/packages/debian/changelog.in
index e3e94f54..c9affe47 100644
--- a/packages/debian/changelog.in
+++ b/packages/debian/changelog.in
@@ -1,4 +1,4 @@
-## This is a cheetah template
+## template:basic
cloud-init (${version}~bzr${revision}-1) UNRELEASED; urgency=low
* build
diff --git a/packages/debian/control.in b/packages/debian/control.in
index 9207e5f4..bd6e3867 100644
--- a/packages/debian/control.in
+++ b/packages/debian/control.in
@@ -1,4 +1,4 @@
-## This is a cheetah template
+## template:basic
Source: cloud-init
Section: admin
Priority: optional
@@ -6,31 +6,22 @@ Maintainer: Scott Moser <smoser@ubuntu.com>
Build-Depends: debhelper (>= 9),
dh-python,
dh-systemd,
- python (>= 2.6.6-3~),
- python-nose,
pyflakes,
- python-setuptools,
- python-selinux,
- python-cheetah,
- python-mocker,
- python-httpretty,
-#for $r in $requires
- ${r},
-#end for
+ ${python},
+ ${test_requires},
+ ${requires}
XS-Python-Version: all
-Standards-Version: 3.9.3
+Standards-Version: 3.9.6
Package: cloud-init
Architecture: all
Depends: procps,
- python,
-#for $r in $requires
- ${r},
-#end for
- python-software-properties | software-properties-common,
- \${misc:Depends},
+ ${python},
+ ${requires},
+ software-properties-common,
+ ${misc:Depends},
Recommends: sudo
-XB-Python-Version: \${python:Versions}
+XB-Python-Version: ${python:Versions}
Description: Init scripts for cloud instances
Cloud instances need special scripts to run during initialisation
to retrieve and install ssh keys and to let the user run various scripts.
diff --git a/packages/debian/copyright b/packages/debian/copyright
index f55bb7a3..c694f30d 100644
--- a/packages/debian/copyright
+++ b/packages/debian/copyright
@@ -5,7 +5,7 @@ Source: https://launchpad.net/cloud-init
This package was debianized by Soren Hansen <soren@ubuntu.com> on
Thu, 04 Sep 2008 12:49:15 +0200 as ec2-init. It was later renamed to
-cloud-utils by Scott Moser <scott.moser@canonical.com>
+cloud-init by Scott Moser <scott.moser@canonical.com>
Upstream Author: Scott Moser <smoser@canonical.com>
Soren Hansen <soren@canonical.com>
diff --git a/packages/debian/rules b/packages/debian/rules.in
index 9e0c5ddb..bb2e1d5c 100755
--- a/packages/debian/rules
+++ b/packages/debian/rules.in
@@ -1,10 +1,12 @@
+## template:basic
#!/usr/bin/make -f
INIT_SYSTEM ?= upstart,systemd
+PYVER ?= python${pyver}
export PYBUILD_INSTALL_ARGS=--init-system=$(INIT_SYSTEM)
%:
- dh $@ --with python2,systemd --buildsystem pybuild
+ dh $@ --with $(PYVER),systemd --buildsystem pybuild
override_dh_install:
dh_install
@@ -12,6 +14,6 @@ override_dh_install:
cp tools/21-cloudinit.conf debian/cloud-init/etc/rsyslog.d/21-cloudinit.conf
override_dh_auto_test:
- # Becuase setup tools didn't copy data...
- cp -r tests/data .pybuild/pythonX.Y_2.7/build/tests
+ # Because setup tools didn't copy data...
+ [ ! -d .pybuild/pythonX.Y_?.?/build/tests ] || cp -r tests/data .pybuild/pythonX.Y_?.?/build/tests
http_proxy= dh_auto_test -- --test-nose
diff --git a/requirements.txt b/requirements.txt
index 943dbef7..19c88857 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,6 @@
# Pypi requirements for cloud-init to work
# Used for untemplating any files or strings with parameters.
-cheetah
jinja2
# This is used for any pretty printing of tabular data.
@@ -9,7 +8,7 @@ PrettyTable
# This one is currently only used by the MAAS datasource. If that
# datasource is removed, this is no longer needed
-oauth
+oauthlib
# This one is currently used only by the CloudSigma and SmartOS datasources.
# If these datasources are removed, this is no longer needed
@@ -32,3 +31,6 @@ requests
# For patching pieces of cloud-config together
jsonpatch
+
+# For Python 2/3 compatibility
+six
diff --git a/setup.py b/setup.py
index 25f09e58..e88d9e88 100755
--- a/setup.py
+++ b/setup.py
@@ -45,7 +45,8 @@ def tiny_p(cmd, capture=True):
stdout = None
stderr = None
sp = subprocess.Popen(cmd, stdout=stdout,
- stderr=stderr, stdin=None)
+ stderr=stderr, stdin=None,
+ universal_newlines=True)
(out, err) = sp.communicate()
ret = sp.returncode
if ret not in [0]:
@@ -144,9 +145,9 @@ class InitsysInstallData(install):
raise DistutilsArgError(
"Invalid --init-system: %s" % (','.join(bad)))
- for sys in self.init_system:
+ for system in self.init_system:
self.distribution.data_files.append(
- (INITSYS_ROOTS[sys], INITSYS_FILES[sys]))
+ (INITSYS_ROOTS[system], INITSYS_FILES[system]))
# Force that command to reinitalize (with new file list)
self.distribution.reinitialize_command('install_data', True)
@@ -174,6 +175,11 @@ else:
}
+requirements = read_requires()
+if sys.version_info < (3,):
+ requirements.append('cheetah')
+
+
setuptools.setup(name='cloud-init',
version=get_version(),
description='EC2 initialisation magic',
@@ -186,6 +192,6 @@ setuptools.setup(name='cloud-init',
],
license='GPLv3',
data_files=data_files,
- install_requires=read_requires(),
+ install_requires=requirements,
cmdclass=cmdclass,
)
diff --git a/templates/resolv.conf.tmpl b/templates/resolv.conf.tmpl
index 1300156c..bfae80db 100644
--- a/templates/resolv.conf.tmpl
+++ b/templates/resolv.conf.tmpl
@@ -24,7 +24,7 @@ sortlist {% for sort in sortlist %}{{sort}} {% endfor %}
{% if options or flags %}
options {% for flag in flags %}{{flag}} {% endfor %}
-{% for key, value in options.iteritems() -%}
+{% for key, value in options.items() -%}
{{key}}:{{value}}
{% endfor %}
{% endif %}
diff --git a/test-requirements.txt b/test-requirements.txt
index 230f0404..9b3d07c5 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,6 +1,7 @@
httpretty>=0.7.1
mock
-mocker
nose
pep8==1.5.7
pyflakes
+contextlib2
+setuptools
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 52305397..6b9394b3 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,17 +1,25 @@
+from __future__ import print_function
+
import os
import sys
+import shutil
+import tempfile
import unittest
-from contextlib import contextmanager
+import six
-from mocker import Mocker
-from mocker import MockerTestCase
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers as ch
from cloudinit import util
-import shutil
-
# Used for detecting different python versions
PY2 = False
PY26 = False
@@ -33,8 +41,20 @@ else:
PY3 = True
if PY26:
- # For now add these on, taken from python 2.7 + slightly adjusted
+ # For now add these on, taken from python 2.7 + slightly adjusted. Drop
+ # all this once Python 2.6 is dropped as a minimum requirement.
class TestCase(unittest.TestCase):
+ def setUp(self):
+ super(TestCase, self).setUp()
+ self.__all_cleanups = ExitStack()
+
+ def tearDown(self):
+ self.__all_cleanups.close()
+ unittest.TestCase.tearDown(self)
+
+ def addCleanup(self, function, *args, **kws):
+ self.__all_cleanups.callback(function, *args, **kws)
+
def assertIs(self, expr1, expr2, msg=None):
if expr1 is not expr2:
standardMsg = '%r is not %r' % (expr1, expr2)
@@ -57,10 +77,17 @@ if PY26:
standardMsg = standardMsg % (value)
self.fail(self._formatMessage(msg, standardMsg))
+ def assertIsInstance(self, obj, cls, msg=None):
+ """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
+ default message."""
+ if not isinstance(obj, cls):
+ standardMsg = '%s is not an instance of %r' % (repr(obj), cls)
+ self.fail(self._formatMessage(msg, standardMsg))
+
def assertDictContainsSubset(self, expected, actual, msg=None):
missing = []
mismatched = []
- for k, v in expected.iteritems():
+ for k, v in expected.items():
if k not in actual:
missing.append(k)
elif actual[k] != v:
@@ -86,17 +113,6 @@ else:
pass
-@contextmanager
-def mocker(verify_calls=True):
- m = Mocker()
- try:
- yield m
- finally:
- m.restore()
- if verify_calls:
- m.verify()
-
-
# Makes the old path start
# with new base instead of whatever
# it previously had
@@ -121,14 +137,19 @@ def retarget_many_wrapper(new_base, am, old_func):
nam = len(n_args)
for i in range(0, nam):
path = args[i]
- n_args[i] = rebase_path(path, new_base)
+ # patchOS() wraps various os and os.path functions, however in
+ # Python 3 some of these now accept file-descriptors (integers).
+ # That breaks rebase_path() so in lieu of a better solution, just
+ # don't rebase if we get a fd.
+ if isinstance(path, six.string_types):
+ n_args[i] = rebase_path(path, new_base)
return old_func(*n_args, **kwds)
return wrapper
-class ResourceUsingTestCase(MockerTestCase):
- def __init__(self, methodName="runTest"):
- MockerTestCase.__init__(self, methodName)
+class ResourceUsingTestCase(TestCase):
+ def setUp(self):
+ super(ResourceUsingTestCase, self).setUp()
self.resource_path = None
def resourceLocation(self, subname=None):
@@ -156,17 +177,23 @@ class ResourceUsingTestCase(MockerTestCase):
return fh.read()
def getCloudPaths(self):
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
cp = ch.Paths({
- 'cloud_dir': self.makeDir(),
+ 'cloud_dir': tmpdir,
'templates_dir': self.resourceLocation(),
})
return cp
class FilesystemMockingTestCase(ResourceUsingTestCase):
- def __init__(self, methodName="runTest"):
- ResourceUsingTestCase.__init__(self, methodName)
- self.patched_funcs = []
+ def setUp(self):
+ super(FilesystemMockingTestCase, self).setUp()
+ self.patched_funcs = ExitStack()
+
+ def tearDown(self):
+ self.patched_funcs.close()
+ ResourceUsingTestCase.tearDown(self)
def replicateTestRoot(self, example_root, target_root):
real_root = self.resourceLocation()
@@ -180,15 +207,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
make_path = util.abs_join(make_path, f)
shutil.copy(real_path, make_path)
- def tearDown(self):
- self.restore()
- ResourceUsingTestCase.tearDown(self)
-
- def restore(self):
- for (mod, f, func) in self.patched_funcs:
- setattr(mod, f, func)
- self.patched_funcs = []
-
def patchUtils(self, new_root):
patch_funcs = {
util: [('write_file', 1),
@@ -205,8 +223,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for (f, am) in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, am, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
# Handle subprocess calls
func = getattr(util, 'subp')
@@ -214,16 +232,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def nsubp(*_args, **_kwargs):
return ('', '')
- setattr(util, 'subp', nsubp)
- self.patched_funcs.append((util, 'subp', func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', nsubp))
def null_func(*_args, **_kwargs):
return None
for f in ['chownbyid', 'chownbyname']:
- func = getattr(util, f)
- setattr(util, f, null_func)
- self.patched_funcs.append((util, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, f, null_func))
def patchOS(self, new_root):
patch_funcs = {
@@ -234,8 +251,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for f in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, 1, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
class HttprettyTestCase(TestCase):
@@ -256,7 +273,21 @@ class HttprettyTestCase(TestCase):
def populate_dir(path, files):
if not os.path.exists(path):
os.makedirs(path)
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
with open(os.path.join(path, name), "w") as fp:
fp.write(content)
fp.close()
+
+try:
+ skipIf = unittest.skipIf
+except AttributeError:
+ # Python 2.6. Doesn't have to be high fidelity.
+ def skipIf(condition, reason):
+ def decorator(func):
+ def wrapper(*args, **kws):
+ if condition:
+ return func(*args, **kws)
+ else:
+ print(reason, file=sys.stderr)
+ return wrapper
+ return decorator
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 17965488..1a307e56 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,14 +1,25 @@
import os
-
-from mocker import MockerTestCase, ARGS, KWARGS
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import handlers
from cloudinit import helpers
-from cloudinit import importer
from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
+from .helpers import TestCase
+
class FakeModule(handlers.Handler):
def __init__(self):
@@ -22,76 +33,73 @@ class FakeModule(handlers.Handler):
pass
-class TestWalkerHandleHandler(MockerTestCase):
+class TestWalkerHandleHandler(TestCase):
def setUp(self):
-
- MockerTestCase.setUp(self)
+ super(TestWalkerHandleHandler, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.data = {
"handlercount": 0,
"frequency": "",
- "handlerdir": self.makeDir(),
+ "handlerdir": tmpdir,
"handlers": helpers.ContentHandlers(),
"data": None}
self.expected_module_name = "part-handler-%03d" % (
self.data["handlercount"],)
expected_file_name = "%s.py" % self.expected_module_name
- expected_file_fullname = os.path.join(self.data["handlerdir"],
- expected_file_name)
+ self.expected_file_fullname = os.path.join(
+ self.data["handlerdir"], expected_file_name)
self.module_fake = FakeModule()
self.ctype = None
self.filename = None
self.payload = "dummy payload"
- # Mock the write_file function
- write_file_mock = self.mocker.replace(util.write_file,
- passthrough=False)
- write_file_mock(expected_file_fullname, self.payload, 0600)
+ # Mock the write_file() function. We'll assert that it got called as
+ # expected in each of the individual tests.
+ resources = ExitStack()
+ self.addCleanup(resources.close)
+ self.write_file_mock = resources.enter_context(
+ mock.patch('cloudinit.util.write_file'))
def test_no_errors(self):
"""Payload gets written to file and added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.result(self.module_fake)
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
-
- self.assertEqual(1, self.data["handlercount"])
+ with mock.patch('cloudinit.importer.import_module',
+ return_value=self.module_fake) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 1)
def test_import_error(self):
"""Module import errors are logged. No handler added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.throw(ImportError())
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
-
- self.assertEqual(0, self.data["handlercount"])
+ with mock.patch('cloudinit.importer.import_module',
+ side_effect=ImportError) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 0)
def test_attribute_error(self):
"""Attribute errors are logged. No handler added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.result(self.module_fake)
- self.mocker.throw(AttributeError())
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
+ with mock.patch('cloudinit.importer.import_module',
+ side_effect=AttributeError,
+ return_value=self.module_fake) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 0)
- self.assertEqual(0, self.data["handlercount"])
-
-class TestHandlerHandlePart(MockerTestCase):
+class TestHandlerHandlePart(unittest.TestCase):
def setUp(self):
self.data = "fake data"
@@ -108,95 +116,80 @@ class TestHandlerHandlePart(MockerTestCase):
C{handle_part} is called without C{frequency} for
C{handler_version} == 1.
"""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_normal_version_2(self):
"""
C{handle_part} is called with C{frequency} for
C{handler_version} == 2.
"""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(2)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload, self.frequency)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=2)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_modfreq_per_always(self):
"""
C{handle_part} is called regardless of frequency if nofreq is always.
"""
self.frequency = "once"
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_ALWAYS)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_ALWAYS,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once."""
self.frequency = "once"
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_ONCE)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_ONCE)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.throw(Exception())
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
-
-
-class TestCmdlineUrl(MockerTestCase):
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ mod_mock.handle_part.side_effect = Exception
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
+
+
+class TestCmdlineUrl(unittest.TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
payload = "0"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl,
- passthrough=False)
- mock_readurl(url, ARGS, KWARGS)
- self.mocker.result(url_helper.StringResponse(payload))
- self.mocker.replay()
-
- self.assertEqual((key, url, None),
- util.get_cmdline_url(names=[key], starts="xxxxxx",
- cmdline=cmdline))
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse(payload)):
+ self.assertEqual(
+ util.get_cmdline_url(names=[key], starts="xxxxxx",
+ cmdline=cmdline),
+ (key, url, None))
def test_valid_content(self):
url = "http://example.com/foo"
@@ -204,27 +197,24 @@ class TestCmdlineUrl(MockerTestCase):
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl,
- passthrough=False)
- mock_readurl(url, ARGS, KWARGS)
- self.mocker.result(url_helper.StringResponse(payload))
- self.mocker.replay()
-
- self.assertEqual((key, url, payload),
- util.get_cmdline_url(names=[key], starts="xcloud-config",
- cmdline=cmdline))
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse(payload)):
+ self.assertEqual(
+ util.get_cmdline_url(names=[key], starts="xcloud-config",
+ cmdline=cmdline),
+ (key, url, payload))
def test_no_key_found(self):
url = "http://example.com/foo"
key = "mykey"
cmdline = "ro %s=%s bar=1" % (key, url)
- self.mocker.replace(url_helper.readurl, passthrough=False)
- self.mocker.result(url_helper.StringResponse(""))
- self.mocker.replay()
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse('')):
+ self.assertEqual(
+ util.get_cmdline_url(names=["does-not-appear"],
+ starts="#cloud-config", cmdline=cmdline),
+ (None, None, None))
- self.assertEqual((None, None, None),
- util.get_cmdline_url(names=["does-not-appear"],
- starts="#cloud-config", cmdline=cmdline))
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index af7f442e..ad32d0b2 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,6 +1,13 @@
"""Tests of the built-in user data handlers."""
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from . import helpers as test_helpers
@@ -14,10 +21,11 @@ from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
class TestBuiltins(test_helpers.FilesystemMockingTestCase):
-
def test_upstart_frequency_no_out(self):
- c_root = self.makeDir()
- up_root = self.makeDir()
+ c_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, c_root)
+ up_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, up_root)
paths = helpers.Paths({
'cloud_dir': c_root,
'upstart_dir': up_root,
@@ -36,7 +44,8 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
def test_upstart_frequency_single(self):
# files should be written out when frequency is ! per-instance
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
freq = PER_INSTANCE
self.patchOS(new_root)
@@ -49,16 +58,16 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
util.ensure_dir("/run")
util.ensure_dir("/etc/upstart")
- mock_subp = self.mocker.replace(util.subp, passthrough=False)
- mock_subp(["initctl", "reload-configuration"], capture=False)
- self.mocker.replay()
+ with mock.patch.object(util, 'subp') as mockobj:
+ h = upstart_job.UpstartJobPartHandler(paths)
+ h.handle_part('', handlers.CONTENT_START,
+ None, None, None)
+ h.handle_part('blah', 'text/upstart-job',
+ 'test.conf', 'blah', freq)
+ h.handle_part('', handlers.CONTENT_END,
+ None, None, None)
- h = upstart_job.UpstartJobPartHandler(paths)
- h.handle_part('', handlers.CONTENT_START,
- None, None, None)
- h.handle_part('blah', 'text/upstart-job',
- 'test.conf', 'blah', freq)
- h.handle_part('', handlers.CONTENT_END,
- None, None, None)
+ self.assertEquals(len(os.listdir('/etc/upstart')), 1)
- self.assertEquals(1, len(os.listdir('/etc/upstart')))
+ mockobj.assert_called_once_with(
+ ['initctl', 'reload-configuration'], capture=False)
diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py
index 7d59222b..d7273035 100644
--- a/tests/unittests/test_cs_util.py
+++ b/tests/unittests/test_cs_util.py
@@ -1,7 +1,21 @@
-from mocker import MockerTestCase
+from __future__ import print_function
+
+import sys
+import unittest
from cloudinit.cs_utils import Cepko
+try:
+ skip = unittest.skip
+except AttributeError:
+ # Python 2.6. Doesn't have to be high fidelity.
+ def skip(reason):
+ def decorator(func):
+ def wrapper(*args, **kws):
+ print(reason, file=sys.stderr)
+ return wrapper
+ return decorator
+
SERVER_CONTEXT = {
"cpu": 1000,
@@ -26,16 +40,21 @@ class CepkoMock(Cepko):
return SERVER_CONTEXT['tags']
-class CepkoResultTests(MockerTestCase):
+# 2015-01-22 BAW: This test is completely useless because it only ever tests
+# the CepkoMock object. Even in its original form, I don't think it ever
+# touched the underlying Cepko class methods.
+@skip('This test is completely useless')
+class CepkoResultTests(unittest.TestCase):
def setUp(self):
- self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
- spec=CepkoMock,
- count=False,
- passthrough=False)
- self.mocked()
- self.mocker.result(CepkoMock())
- self.mocker.replay()
- self.c = Cepko()
+ pass
+ # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
+ # spec=CepkoMock,
+ # count=False,
+ # passthrough=False)
+ # self.mocked()
+ # self.mocker.result(CepkoMock())
+ # self.mocker.replay()
+ # self.c = Cepko()
def test_getitem(self):
result = self.c.all()
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index fd6bd8a1..e5b227f8 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -1,10 +1,17 @@
"""Tests for handling of userdata within cloud init."""
-import StringIO
-
import gzip
import logging
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from six import BytesIO, StringIO
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
@@ -37,23 +44,22 @@ class FakeDataSource(sources.DataSource):
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
+ super(TestConsumeUserData, self).setUp()
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
+ helpers.FilesystemMockingTestCase.tearDown(self)
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def capture_log(self, lvl=logging.DEBUG):
- log_file = StringIO.StringIO()
+ log_file = StringIO()
self._log_handler = logging.StreamHandler(log_file)
self._log_handler.setLevel(lvl)
self._log = log.getLogger()
@@ -71,7 +77,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
ci = stages.Init()
ci.datasource = FakeDataSource(blob)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -99,7 +106,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -138,7 +146,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -184,7 +193,8 @@ c: d
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -214,7 +224,8 @@ name: user
run:
- z
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -249,7 +260,8 @@ vendor_data:
enabled: True
prefix: /bin/true
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -309,7 +321,8 @@ p: 1
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
@@ -335,25 +348,24 @@ p: 1
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled non-multipart (text/x-not-multipart) userdata:",
+ log_file.getvalue())
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled non-multipart (text/x-not-multipart) userdata:",
- log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_mime_gzip_compressed(self):
"""Tests that individual message gzip encoding works."""
def gzip_part(text):
- contents = StringIO.StringIO()
- f = gzip.GzipFile(fileobj=contents, mode='w')
- f.write(str(text))
+ contents = BytesIO()
+ f = gzip.GzipFile(fileobj=contents, mode='wb')
+ f.write(util.encode_text(text))
f.flush()
f.close()
return MIMEApplication(contents.getvalue(), 'gzip')
@@ -374,7 +386,8 @@ c: 4
message.attach(gzip_part(base_content2))
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -394,17 +407,15 @@ c: 4
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script."""
@@ -413,16 +424,17 @@ c: 4
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script."""
@@ -433,16 +445,17 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script."""
@@ -453,13 +466,14 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(outpath, script, 0700)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 1a48ee5f..e9cd2fa5 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -46,7 +46,7 @@ def _write_cloud_info_file(value):
cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
def _remove_cloud_info_file():
@@ -67,12 +67,12 @@ def _write_user_data_files(mount_dir, value):
udfile = open(deltacloud_user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(deltacloud_user_data_file, 0664)
+ os.chmod(deltacloud_user_data_file, 0o664)
udfile = open(user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(user_data_file, 0664)
+ os.chmod(user_data_file, 0o664)
def _remove_user_data_files(mount_dir,
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e992a006..965bce4b 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,14 +1,24 @@
from cloudinit import helpers
-from cloudinit.util import load_file
+from cloudinit.util import b64e, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-import base64
import crypt
-from mocker import MockerTestCase
import os
import stat
import yaml
+import shutil
+import tempfile
+import unittest
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -40,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64e(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
@@ -66,26 +76,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
return content
-class TestAzureDataSource(MockerTestCase):
+class TestAzureDataSource(TestCase):
def setUp(self):
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
+ super(TestAzureDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.unapply = []
- super(TestAzureDataSource, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestAzureDataSource, self).tearDown()
+ super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
+ for module, name, new in patches:
+ self.patches.enter_context(mock.patch.object(module, name, new))
def _get_ds(self, data):
@@ -117,16 +126,14 @@ class TestAzureDataSource(MockerTestCase):
mod = DataSourceAzure
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
-
- self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files',
- _pubkeys_from_crt_files),
- (mod, 'iid_from_shared_config',
- _iid_from_shared_config),
- (mod, 'apply_hostname_bounce',
- _apply_hostname_bounce), ])
+ self.apply_patches([
+ (mod, 'list_possible_azure_ds_devs', dsdevs),
+ (mod, 'invoke_agent', _invoke_agent),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
+ (mod, 'iid_from_shared_config', _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce', _apply_hostname_bounce),
+ ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
@@ -153,7 +160,7 @@ class TestAzureDataSource(MockerTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(os.path.isdir(self.waagent_d))
- self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700)
+ self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
def test_user_cfg_set_agent_command_plain(self):
# set dscfg in via plaintext
@@ -174,7 +181,7 @@ class TestAzureDataSource(MockerTestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
@@ -226,13 +233,13 @@ class TestAzureDataSource(MockerTestCase):
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': base64.b64encode(mydata)}
+ odata = {'UserData': b64e(mydata)}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata)
+ self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
def test_no_datasource_expected(self):
# no source should be found if no seed_dir and no devs
@@ -274,7 +281,7 @@ class TestAzureDataSource(MockerTestCase):
'command': 'my-bounce-command',
'hostname_command': 'my-hostname-command'}}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -289,7 +296,7 @@ class TestAzureDataSource(MockerTestCase):
# config specifying set_hostname off should not bounce
cfg = {'set_hostname': False}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -318,7 +325,7 @@ class TestAzureDataSource(MockerTestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)),
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
@@ -340,7 +347,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual(userdata, dsrc.userdata_raw)
+ self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
@@ -355,7 +362,7 @@ class TestAzureDataSource(MockerTestCase):
def test_existing_ovf_same(self):
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': base64.b64encode("SOMEUSERDATA")}
+ odata = {'UserData': b64e("SOMEUSERDATA")}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
populate_dir(self.waagent_d,
@@ -379,9 +386,9 @@ class TestAzureDataSource(MockerTestCase):
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
# if ovf-env.xml differs.
cached_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("FOO_USERDATA")})
+ {'userdata': b64e("FOO_USERDATA")})
new_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("NEW_USERDATA")})
+ {'userdata': b64e("NEW_USERDATA")})
populate_dir(self.waagent_d,
{'ovf-env.xml': cached_ovfenv,
@@ -391,7 +398,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds({'ovfcontent': new_ovfenv})
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA")
+ self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")
self.assertTrue(os.path.exists(
os.path.join(self.waagent_d, 'otherfile')))
self.assertFalse(
@@ -402,7 +409,7 @@ class TestAzureDataSource(MockerTestCase):
load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
-class TestReadAzureOvf(MockerTestCase):
+class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
@@ -417,7 +424,7 @@ class TestReadAzureOvf(MockerTestCase):
self.assertIn(mypk, cfg['_pubkeys'])
-class TestReadAzureSharedConfig(MockerTestCase):
+class TestReadAzureSharedConfig(unittest.TestCase):
def test_valid_content(self):
xml = """<?xml version="1.0" encoding="utf-8"?>
<SharedConfig>
@@ -429,14 +436,3 @@ class TestReadAzureSharedConfig(MockerTestCase):
</SharedConfig>"""
ret = DataSourceAzure.iid_from_shared_config_content(xml)
self.assertEqual("MY_INSTANCE_ID", ret)
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 306ac7d8..772d189a 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -39,6 +39,7 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
+ super(DataSourceCloudSigmaTest, self).setUp()
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index d88066e5..e28bdd84 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -1,10 +1,17 @@
from copy import copy
import json
import os
-import os.path
-
-import mocker
-from mocker import MockerTestCase
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
@@ -12,7 +19,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from .. import helpers as unit_helpers
+from ..helpers import TestCase
+
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -64,11 +72,12 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(MockerTestCase):
+class TestConfigDriveDataSource(TestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -91,23 +100,29 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[provided_name]))
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+
+ def exists_side_effect():
+ yield False
+ yield True
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ self.assertEqual(exists_mock.call_count, 2)
+
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -123,19 +138,19 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ with ExitStack() as mocks:
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[dev_name]))
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ return_value=True))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ exists_mock.assert_called_once_with(mock.ANY)
+
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -156,16 +171,21 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker(verify_calls=False) as my_mock:
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ with mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ # We don't assert the call count for os.path.exists() because
+ # not all of the entries in name_tests results in two calls to
+ # that function. Specifically, 'root2k' doesn't seem to call
+ # it at all.
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -173,12 +193,6 @@ class TestConfigDriveDataSource(MockerTestCase):
None,
helpers.Paths({}))
found = ds.read_config_drive(self.tmp)
- exists_mock = self.mocker.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(True)
- self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
@@ -193,8 +207,9 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ with mock.patch.object(os.path, 'exists', return_value=True):
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -326,7 +341,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):
def populate_dir(seed_dir, files):
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index d1270fc2..98f9cfac 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -18,8 +18,7 @@
import httpretty
import re
-from types import ListType
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -110,7 +109,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual([DO_META.get('public-keys')],
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
@httpretty.activate
def test_multiple_ssh_keys(self):
@@ -124,4 +123,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual(DO_META.get('public-keys').splitlines(),
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 06050bb1..6dd4b5ed 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -19,7 +19,7 @@ import httpretty
import re
from base64 import b64encode, b64decode
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -45,7 +45,7 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode('/bin/echo baz\n'),
+ 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
'instance/attributes/user-data-encoding': 'base64',
}
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index c157beb8..d25e1adc 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,19 +1,25 @@
from copy import copy
import os
+import shutil
+import tempfile
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
-class TestMAASDataSource(mocker.MockerTestCase):
+class TestMAASDataSource(TestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
@@ -93,16 +99,18 @@ class TestMAASDataSource(mocker.MockerTestCase):
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
- valid = {'meta-data/instance-id': 'i-instanceid',
+ valid = {
+ 'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
- 'user-data': 'foodata'}
+ 'user-data': 'foodata',
+ }
valid_order = [
'meta-data/local-hostname',
'meta-data/instance-id',
'meta-data/public-keys',
'user-data',
- ]
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
@@ -110,28 +118,38 @@ class TestMAASDataSource(mocker.MockerTestCase):
def my_headers_cb(url):
return my_headers
- mock_request = self.mocker.replace(url_helper.readurl,
- passthrough=False)
-
- for key in valid_order:
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- mock_request(url, headers=None, timeout=mocker.ANY,
- data=mocker.ANY, sec_between=mocker.ANY,
- ssl_details=mocker.ANY, retries=mocker.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mocker.ANY)
- resp = valid.get(key)
- self.mocker.result(url_helper.StringResponse(resp))
- self.mocker.replay()
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
- header_cb=my_headers_cb, version=my_ver)
-
- self.assertEqual("foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
+ # Each time url_helper.readurl() is called, something different is
+ # returned based on the canned data above. We need to build up a list
+ # of side effect return values, which the mock will return. At the
+ # same time, we'll build up a list of expected call arguments for
+ # asserting after the code under test is run.
+ calls = []
+
+ def side_effect():
+ for key in valid_order:
+ resp = valid.get(key)
+ url = "%s/%s/%s" % (my_seed, my_ver, key)
+ calls.append(
+ mock.call(url, headers=None, timeout=mock.ANY,
+ data=mock.ANY, sec_between=mock.ANY,
+ ssl_details=mock.ANY, retries=mock.ANY,
+ headers_cb=my_headers_cb,
+ exception_cb=mock.ANY))
+ yield url_helper.StringResponse(resp)
+
+ # Now do the actual call of the code under test.
+ with mock.patch.object(url_helper, 'readurl',
+ side_effect=side_effect()) as mockobj:
+ userdata, metadata = DataSourceMAAS.read_maas_seed_url(
+ my_seed, header_cb=my_headers_cb, version=my_ver)
+
+ self.assertEqual("foodata", userdata)
+ self.assertEqual(metadata['instance-id'],
+ valid['meta-data/instance-id'])
+ self.assertEqual(metadata['local-hostname'],
+ valid['meta-data/local-hostname'])
+
+ mockobj.has_calls(calls)
def test_seed_url_invalid(self):
"""Verify that invalid seed_url raises MAASSeedDirMalformed."""
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index e9235951..4f967f58 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,35 +1,39 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from mocker import MockerTestCase
import os
import yaml
+import shutil
+import tempfile
+import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoCloudDataSource(MockerTestCase):
+
+class TestNoCloudDataSource(TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ super(TestNoCloudDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
- self.unapply = []
- self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
- super(TestNoCloudDataSource, self).setUp()
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).tearDown()
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _getcmdline(self):
- return self.cmdline
+ self.mocks.enter_context(
+ mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
@@ -59,7 +63,9 @@ class TestNoCloudDataSource(MockerTestCase):
def my_find_devs_with(*args, **kwargs):
raise PsuedoException
- self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+ self.mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ side_effect=PsuedoException))
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
@@ -85,7 +91,7 @@ class TestNoCloudDataSource(MockerTestCase):
data = {
'fs_label': None,
- 'meta-data': {'instance-id': 'IID'},
+ 'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
'user-data': "USER_DATA_RAW",
}
@@ -133,7 +139,7 @@ class TestNoCloudDataSource(MockerTestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(MockerTestCase):
+class TestParseCommandLineData(unittest.TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
@@ -178,15 +184,4 @@ class TestParseCommandLineData(MockerTestCase):
self.assertFalse(ret)
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index b4fd1f4d..27adf21b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,12 +1,14 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from mocker import MockerTestCase
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from base64 import b64encode
import os
import pwd
+import shutil
+import tempfile
+import unittest
+
TEST_VARS = {
'VAR1': 'single',
@@ -37,12 +39,13 @@ CMD_IP_OUT = '''\
'''
-class TestOpenNebulaDataSource(MockerTestCase):
+class TestOpenNebulaDataSource(TestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
# defaults for few tests
@@ -176,7 +179,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64encode(USER_DATA)
+ b64userdata = util.b64e(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -188,7 +191,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ populate_context_dir(my_d, {k: util.b64e(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
@@ -228,7 +231,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
util.find_devs_with = orig_find_devs_with
-class TestOpenNebulaNetwork(MockerTestCase):
+class TestOpenNebulaNetwork(unittest.TestCase):
def setUp(self):
super(TestOpenNebulaNetwork, self).setUp()
@@ -280,7 +283,7 @@ iface eth0 inet static
''')
-class TestParseShellConfig(MockerTestCase):
+class TestParseShellConfig(unittest.TestCase):
def test_no_seconds(self):
cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
# we could test 'sleep 2', but that would make the test run slower.
@@ -290,7 +293,7 @@ class TestParseShellConfig(MockerTestCase):
def populate_context_dir(path, variables):
data = "# Context variables generated by OpenNebula\n"
- for (k, v) in variables.iteritems():
+ for k, v in variables.items():
data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
populate_dir(path, {'context.sh': data})
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 49894e51..81ef1546 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -20,12 +20,11 @@ import copy
import json
import re
-from StringIO import StringIO
-
-from urlparse import urlparse
-
from .. import helpers as test_helpers
+from six import StringIO
+from six.moves.urllib.parse import urlparse
+
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceOpenStack as ds
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 65675106..8b62b1b1 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -22,16 +22,21 @@
# return responses.
#
-import base64
+from __future__ import print_function
+
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
from .. import helpers
import os
import os.path
import re
+import shutil
+import tempfile
import stat
import uuid
+
MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
@@ -107,11 +112,12 @@ class MockSerial(object):
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
+ super(TestSmartOSDataSource, self).setUp()
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
- self.legacy_user_d = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.legacy_user_d = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.legacy_user_d)
# If you should want to watch the logs...
self._log = None
@@ -227,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -248,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -264,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -365,7 +371,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
- print name_f
+ print(name_f)
self.assertEquals(permissions, '400')
self.assertFalse(found_new)
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index db6aa0e8..35153f0d 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -4,6 +4,8 @@ from cloudinit import util
from .. import helpers
import os
+import shutil
+import tempfile
unknown_arch_info = {
'arches': ['default'],
@@ -53,7 +55,8 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestGenericDistro, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def _write_load_sudoers(self, _user, rules):
cls = distros.fetch("ubuntu")
@@ -64,7 +67,6 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
d.write_sudo_rules("harlowja", rules)
contents = util.load_file(d.ci_sudoers_fn)
- self.restore()
return contents
def _count_in(self, lines_look_for, text_content):
diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py
index 8e644f4d..143e29a9 100644
--- a/tests/unittests/test_distros/test_hostname.py
+++ b/tests/unittests/test_distros/test_hostname.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit.distros.parsers import hostname
@@ -12,7 +12,7 @@ blahblah
BASE_HOSTNAME = BASE_HOSTNAME.strip()
-class TestHostnameHelper(MockerTestCase):
+class TestHostnameHelper(unittest.TestCase):
def test_parse_same(self):
hn = hostname.HostnameConf(BASE_HOSTNAME)
self.assertEquals(str(hn).strip(), BASE_HOSTNAME)
diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py
index 687a0dab..fc701eaa 100644
--- a/tests/unittests/test_distros/test_hosts.py
+++ b/tests/unittests/test_distros/test_hosts.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit.distros.parsers import hosts
@@ -14,7 +14,7 @@ BASE_ETC = '''
BASE_ETC = BASE_ETC.strip()
-class TestHostsHelper(MockerTestCase):
+class TestHostsHelper(unittest.TestCase):
def test_parse(self):
eh = hosts.HostsConf(BASE_ETC)
self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']])
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index 193338e8..6d30c5b8 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -1,8 +1,16 @@
-from mocker import MockerTestCase
+import os
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-import os
+from six import StringIO
+from ..helpers import TestCase
from cloudinit import distros
from cloudinit import helpers
@@ -11,8 +19,6 @@ from cloudinit import util
from cloudinit.distros.parsers.sys_conf import SysConf
-from StringIO import StringIO
-
BASE_NET_CFG = '''
auto lo
@@ -74,7 +80,7 @@ class WriteBuffer(object):
return self.buffer.getvalue()
-class TestNetCfgDistro(MockerTestCase):
+class TestNetCfgDistro(TestCase):
def _get_distro(self, dname):
cls = distros.fetch(dname)
@@ -85,34 +91,28 @@ class TestNetCfgDistro(MockerTestCase):
def test_simple_write_ub(self):
ub_distro = self._get_distro('ubuntu')
- util_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
+ write_bufs = {}
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
+ def replace_write(filename, content, mode=0o644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
- write_bufs = {}
-
- def replace_write(filename, content, mode=0644, omode="wb"):
- buf = WriteBuffer()
- buf.mode = mode
- buf.omode = omode
- buf.write(content)
- write_bufs[filename] = buf
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
- util_mock(mocker.ARGS)
- self.mocker.call(replace_write)
- self.mocker.replay()
- ub_distro.apply_network(BASE_NET_CFG, False)
+ ub_distro.apply_network(BASE_NET_CFG, False)
- self.assertEquals(len(write_bufs), 1)
- self.assertIn('/etc/network/interfaces', write_bufs)
- write_buf = write_bufs['/etc/network/interfaces']
- self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
- self.assertEquals(write_buf.mode, 0644)
+ self.assertEquals(len(write_bufs), 1)
+ self.assertIn('/etc/network/interfaces', write_bufs)
+ write_buf = write_bufs['/etc/network/interfaces']
+ self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
+ self.assertEquals(write_buf.mode, 0o644)
def assertCfgEquals(self, blob1, blob2):
b1 = dict(SysConf(blob1.strip().splitlines()))
@@ -127,53 +127,41 @@ class TestNetCfgDistro(MockerTestCase):
def test_simple_write_rh(self):
rh_distro = self._get_distro('rhel')
- write_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
write_bufs = {}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
buf.write(content)
write_bufs[filename] = buf
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
-
- load_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result('')
-
- for _i in range(0, 3):
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- self.mocker.replay()
- rh_distro.apply_network(BASE_NET_CFG, False)
-
- self.assertEquals(len(write_bufs), 4)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', return_value=''))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
+
+ rh_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
DEVICE="lo"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
DEVICE="eth0"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -182,77 +170,66 @@ ONBOOT=yes
GATEWAY="192.168.1.254"
BROADCAST="192.168.1.0"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
DEVICE="eth1"
BOOTPROTO="dhcp"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
# Created by cloud-init v. 0.7
NETWORKING=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
def test_write_ipv6_rhel(self):
rh_distro = self._get_distro('rhel')
- write_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
write_bufs = {}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
buf.write(content)
write_bufs[filename] = buf
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
-
- load_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result('')
-
- for _i in range(0, 3):
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- self.mocker.replay()
- rh_distro.apply_network(BASE_NET_CFG_IPV6, False)
-
- self.assertEquals(len(write_bufs), 4)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', return_value=''))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
+
+ rh_distro.apply_network(BASE_NET_CFG_IPV6, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
DEVICE="lo"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
DEVICE="eth0"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -264,11 +241,12 @@ IPV6INIT=yes
IPV6ADDR="2607:f0d0:1002:0011::2"
IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
- expected_buf = '''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
DEVICE="eth1"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -280,38 +258,22 @@ IPV6INIT=yes
IPV6ADDR="2607:f0d0:1002:0011::3"
IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
# Created by cloud-init v. 0.7
NETWORKING=yes
NETWORKING_IPV6=yes
IPV6_AUTOCONF=no
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
def test_simple_write_freebsd(self):
fbsd_distro = self._get_distro('freebsd')
- util_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- subp_mock = self.mocker.replace(util.subp,
- spec=False, passthrough=False)
-
- subp_mock(['ifconfig', '-a'])
- self.mocker.count(0, None)
- self.mocker.result(('vtnet0', ''))
-
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
write_bufs = {}
read_bufs = {
@@ -319,7 +281,7 @@ IPV6_AUTOCONF=no
'/etc/resolv.conf': '',
}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
@@ -336,23 +298,24 @@ IPV6_AUTOCONF=no
return str(write_bufs[fname])
return read_bufs[fname]
- util_mock(mocker.ARGS)
- self.mocker.call(replace_write)
- self.mocker.count(0, None)
-
- load_mock(mocker.ARGS)
- self.mocker.call(replace_read)
- self.mocker.count(0, None)
-
- self.mocker.replay()
- fbsd_distro.apply_network(BASE_NET_CFG, False)
-
- self.assertIn('/etc/rc.conf', write_bufs)
- write_buf = write_bufs['/etc/rc.conf']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'subp', return_value=('vtnet0', '')))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'exists', return_value=False))
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', replace_read))
+
+ fbsd_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertIn('/etc/rc.conf', write_bufs)
+ write_buf = write_bufs['/etc/rc.conf']
+ expected_buf = '''
ifconfig_vtnet0="192.168.1.5 netmask 255.255.255.0"
ifconfig_vtnet1="DHCP"
defaultrouter="192.168.1.254"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py
index 6b6ff6aa..faaf5b7f 100644
--- a/tests/unittests/test_distros/test_resolv.py
+++ b/tests/unittests/test_distros/test_resolv.py
@@ -1,8 +1,7 @@
-from mocker import MockerTestCase
-
from cloudinit.distros.parsers import resolv_conf
import re
+from ..helpers import TestCase
BASE_RESOLVE = '''
@@ -14,7 +13,7 @@ nameserver 10.15.30.92
BASE_RESOLVE = BASE_RESOLVE.strip()
-class TestResolvHelper(MockerTestCase):
+class TestResolvHelper(TestCase):
def test_parse_same(self):
rp = resolv_conf.ResolvConf(BASE_RESOLVE)
rp_r = str(rp).strip()
diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py
index 0c651407..03d89a10 100644
--- a/tests/unittests/test_distros/test_sysconfig.py
+++ b/tests/unittests/test_distros/test_sysconfig.py
@@ -1,14 +1,13 @@
-from mocker import MockerTestCase
-
import re
from cloudinit.distros.parsers.sys_conf import SysConf
+from ..helpers import TestCase
# Lots of good examples @
# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
-class TestSysConfHelper(MockerTestCase):
+class TestSysConfHelper(TestCase):
# This function was added in 2.7, make it work for 2.6
def assertRegMatches(self, text, regexp):
regexp = re.compile(regexp)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index 50398c74..e4488e2a 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -1,9 +1,10 @@
-from mocker import MockerTestCase
-
from cloudinit import distros
from cloudinit import helpers
from cloudinit import settings
+from ..helpers import TestCase
+
+
bcfg = {
'name': 'bob',
'plain_text_passwd': 'ubuntu',
@@ -15,7 +16,7 @@ bcfg = {
}
-class TestUGNormalize(MockerTestCase):
+class TestUGNormalize(TestCase):
def _make_distro(self, dtype, def_user=None):
cfg = dict(settings.CFG_BUILTIN)
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
index 2f4c2fda..95d24b9b 100644
--- a/tests/unittests/test_filters/test_launch_index.py
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -2,7 +2,7 @@ import copy
from .. import helpers
-import itertools
+from six.moves import filterfalse
from cloudinit.filters import launch_index
from cloudinit import user_data as ud
@@ -36,11 +36,9 @@ class TestLaunchFilter(helpers.ResourceUsingTestCase):
return False
# Do some basic payload checking
msg1_msgs = [m for m in msg1.walk()]
- msg1_msgs = [m for m in
- itertools.ifilterfalse(ud.is_skippable, msg1_msgs)]
+ msg1_msgs = [m for m in filterfalse(ud.is_skippable, msg1_msgs)]
msg2_msgs = [m for m in msg2.walk()]
- msg2_msgs = [m for m in
- itertools.ifilterfalse(ud.is_skippable, msg2_msgs)]
+ msg2_msgs = [m for m in filterfalse(ud.is_skippable, msg2_msgs)]
for i in range(0, len(msg2_msgs)):
m1_msg = msg1_msgs[i]
m2_msg = msg2_msgs[i]
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
index 203dd2aa..d8fe9a4f 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -1,27 +1,27 @@
-from mocker import MockerTestCase
-
from cloudinit import util
from cloudinit.config import cc_apt_configure
+from ..helpers import TestCase
import os
import re
+import shutil
+import tempfile
+import unittest
-class TestAptProxyConfig(MockerTestCase):
+class TestAptProxyConfig(TestCase):
def setUp(self):
super(TestAptProxyConfig, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.pfile = os.path.join(self.tmp, "proxy.cfg")
self.cfile = os.path.join(self.tmp, "config.cfg")
def _search_apt_config(self, contents, ptype, value):
- print(
- r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, "flags=re.IGNORECASE")
- return(re.search(
+ return re.search(
r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, flags=re.IGNORECASE))
+ contents, flags=re.IGNORECASE)
def test_apt_proxy_written(self):
cfg = {'apt_proxy': 'myproxy'}
@@ -60,7 +60,7 @@ class TestAptProxyConfig(MockerTestCase):
contents = str(util.read_file_or_url(self.pfile))
- for ptype, pval in values.iteritems():
+ for ptype, pval in values.items():
self.assertTrue(self._search_apt_config(contents, ptype, pval))
def test_proxy_deleted(self):
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 0558023a..a6b9c0fd 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,15 +1,26 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import helpers
from cloudinit import util
from cloudinit.config import cc_ca_certs
+from ..helpers import TestCase
import logging
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoConfig(MockerTestCase):
+class TestNoConfig(unittest.TestCase):
def setUp(self):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
@@ -22,15 +33,20 @@ class TestNoConfig(MockerTestCase):
Test that nothing is done if no ca-certs configuration is provided.
"""
config = util.get_builtin_cfg()
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
- self.mocker.replay()
+ with ExitStack() as mocks:
+ util_mock = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ certs_mock = mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
- self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(util_mock.call_count, 0)
+ self.assertEqual(certs_mock.call_count, 0)
-class TestConfig(MockerTestCase):
+
+class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
@@ -39,16 +55,16 @@ class TestConfig(MockerTestCase):
self.log = logging.getLogger("TestNoConfig")
self.args = []
- # Mock out the functions that actually modify the system
- self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs,
- passthrough=False)
- self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
- passthrough=False)
- self.mock_remove = self.mocker.replace(
- cc_ca_certs.remove_default_ca_certs, passthrough=False)
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- # Order must be correct
- self.mocker.order()
+ # Mock out the functions that actually modify the system
+ self.mock_add = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'add_ca_certs'))
+ self.mock_update = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
+ self.mock_remove = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
def test_no_trusted_list(self):
"""
@@ -57,86 +73,88 @@ class TestConfig(MockerTestCase):
"""
config = {"ca-certs": {}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(["CERT1", "CERT2"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove()
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
+
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove()
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
-class TestAddCaCerts(MockerTestCase):
+
+class TestAddCaCerts(TestCase):
def setUp(self):
super(TestAddCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replay()
- cc_ca_certs.add_ca_certs([])
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_ca_certs.add_ca_certs([])
+ self.assertEqual(mockobj.call_count, 0)
def test_single_cert_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -146,19 +164,21 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
-
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/etc/ca-certificates.conf", expected, omode="wb")
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs([cert])
- cc_ca_certs.add_ca_certs([cert])
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf", expected, omode="wb"),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -167,75 +187,89 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\nline3"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
+ cc_ca_certs.add_ca_certs([cert])
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode="wb"),
+ ])
- mock_write("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
- omode="wb")
- self.mocker.replay()
-
- cc_ca_certs.add_ca_certs([cert])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
-
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0644)
-
ca_certs_content = "line1\nline2\nline3"
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
- out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
- mock_write("/etc/ca-certificates.conf", out, omode="wb")
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs(certs)
- cc_ca_certs.add_ca_certs(certs)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ expected_cert_file, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode='wb'),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-class TestUpdateCaCerts(MockerTestCase):
- def test_commands(self):
- mock_check_call = self.mocker.replace(util.subp,
- passthrough=False)
- mock_check_call(["update-ca-certificates"], capture=False)
- self.mocker.replay()
- cc_ca_certs.update_ca_certs()
+class TestUpdateCaCerts(unittest.TestCase):
+ def test_commands(self):
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_ca_certs.update_ca_certs()
+ mockobj.assert_called_once_with(
+ ["update-ca-certificates"], capture=False)
-class TestRemoveDefaultCaCerts(MockerTestCase):
+class TestRemoveDefaultCaCerts(TestCase):
def setUp(self):
super(TestRemoveDefaultCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_commands(self):
- mock_delete_dir_contents = self.mocker.replace(
- util.delete_dir_contents, passthrough=False)
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_subp = self.mocker.replace(util.subp,
- passthrough=False)
-
- mock_delete_dir_contents("/usr/share/ca-certificates/")
- mock_delete_dir_contents("/etc/ssl/certs/")
- mock_write("/etc/ca-certificates.conf", "", mode=0644)
- mock_subp(('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
- self.mocker.replay()
-
- cc_ca_certs.remove_default_ca_certs()
+ with ExitStack() as mocks:
+ mock_delete = mocks.enter_context(
+ mock.patch.object(util, 'delete_dir_contents'))
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_subp = mocks.enter_context(mock.patch.object(util, 'subp'))
+
+ cc_ca_certs.remove_default_ca_certs()
+
+ mock_delete.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/"),
+ mock.call("/etc/ssl/certs/"),
+ ])
+
+ mock_write.assert_called_once_with(
+ "/etc/ca-certificates.conf", "", mode=0o644)
+
+ mock_subp.assert_called_once_with(
+ ('debconf-set-selections', '-'),
+ "ca-certificates ca-certificates/trust_new_crts select no")
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index ef1aa208..edad88cb 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -11,15 +11,21 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
+import six
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
+CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"])
+
class TestChef(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestChef, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def fetch_cloud(self, distro_kind):
cls = distros.fetch(distro_kind)
@@ -37,9 +43,13 @@ class TestChef(t_help.FilesystemMockingTestCase):
for d in cc_chef.CHEF_DIRS:
self.assertFalse(os.path.isdir(d))
+ @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_basic_config(self):
- # This should create a file of the format...
"""
+ test basic config looks sane
+
+ # This should create a file of the format...
# Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000
log_level :info
ssl_verify_mode :verify_none
@@ -74,7 +84,7 @@ class TestChef(t_help.FilesystemMockingTestCase):
for k, v in cfg['chef'].items():
self.assertIn(v, c)
for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
- if isinstance(v, basestring):
+ if isinstance(v, six.string_types):
self.assertIn(v, c)
c = util.load_file(cc_chef.CHEF_FB_PATH)
self.assertEqual({}, json.loads(c))
@@ -101,6 +111,8 @@ class TestChef(t_help.FilesystemMockingTestCase):
'c': 'd',
}, json.loads(c))
+ @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_template_deletes(self):
tpl_file = util.load_file('templates/chef_client.rb.tmpl')
self.patchUtils(self.tmp)
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
index 8891ca04..80708d7b 100644
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -26,6 +26,8 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -33,7 +35,8 @@ LOG = logging.getLogger(__name__)
class TestDebug(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestDebug, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro, metadata=None):
self.patchUtils(self.new_root)
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
index 5d0636d1..bef0d80d 100644
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -1,14 +1,23 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import util
from cloudinit.config import cc_growpart
+from ..helpers import TestCase
import errno
import logging
import os
import re
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
# growpart:
# mode: auto # off, on, auto, 'growpart'
@@ -42,7 +51,7 @@ growpart disk partition
"""
-class TestDisabled(MockerTestCase):
+class TestDisabled(unittest.TestCase):
def setUp(self):
super(TestDisabled, self).setUp()
self.name = "growpart"
@@ -57,14 +66,14 @@ class TestDisabled(MockerTestCase):
# this really only verifies that resizer_factory isn't called
config = {'growpart': {'mode': 'off'}}
- self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- self.mocker.replay()
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj:
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(mockobj.call_count, 0)
-class TestConfig(MockerTestCase):
+class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "growpart"
@@ -77,75 +86,76 @@ class TestConfig(MockerTestCase):
self.cloud_init = None
self.handle = cc_growpart.handle
- # Order must be correct
- self.mocker.order()
-
def test_no_resizers_auto_is_fine(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
- config = {'growpart': {'mode': 'auto'}}
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_no_resizers_mode_growpart_is_exception(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(
+ ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
- config = {'growpart': {'mode': "growpart"}}
- self.assertRaises(ValueError, self.handle, self.name, config,
- self.cloud_init, self.log, self.args)
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_mode_auto_prefers_growpart(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
- ret = cc_growpart.resizer_factory(mode="auto")
- self.assertTrue(isinstance(ret, cc_growpart.ResizeGrowPart))
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_handle_with_no_growpart_entry(self):
# if no 'growpart' entry in config, then mode=auto should be used
myresizer = object()
+ retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),)
+
+ with ExitStack() as mocks:
+ factory = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resizer_factory',
+ return_value=myresizer))
+ rsdevs = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resize_devices',
+ return_value=retval))
+ mocks.enter_context(
+ mock.patch.object(cc_growpart, 'RESIZERS',
+ (('mysizer', object),)
+ ))
- factory = self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- rsdevs = self.mocker.replace(cc_growpart.resize_devices,
- passthrough=False)
- factory("auto")
- self.mocker.result(myresizer)
- rsdevs(myresizer, ["/"])
- self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
- self.mocker.replay()
-
- try:
- orig_resizers = cc_growpart.RESIZERS
- cc_growpart.RESIZERS = (('mysizer', object),)
self.handle(self.name, {}, self.cloud_init, self.log, self.args)
- finally:
- cc_growpart.RESIZERS = orig_resizers
+ factory.assert_called_once_with('auto')
+ rsdevs.assert_called_once_with(myresizer, ['/'])
-class TestResize(MockerTestCase):
+
+class TestResize(unittest.TestCase):
def setUp(self):
super(TestResize, self).setUp()
self.name = "growpart"
self.log = logging.getLogger("TestResize")
- # Order must be correct
- self.mocker.order()
-
def test_simple_devices(self):
# test simple device list
# this patches out devent2dev, os.stat, and device_part_info
# so in the end, doesn't test a lot
devs = ["/dev/XXda1", "/dev/YYda2"]
- devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,
st_nlink=1, st_uid=0, st_gid=6, st_size=0,
st_atime=0, st_mtime=0, st_ctime=0)
enoent = ["/dev/NOENT"]
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
index eb251636..de85eff6 100644
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -29,9 +29,11 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestLocale(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestLocale, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -59,6 +62,6 @@ class TestLocale(t_help.FilesystemMockingTestCase):
cc = self._get_cloud('sles')
cc_locale.handle('cc_locale', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/language')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/language', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 40481f16..0bcdcb31 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -18,11 +18,10 @@
from cloudinit.config import cc_seed_random
-import base64
import gzip
import tempfile
-from StringIO import StringIO
+from six import BytesIO
from cloudinit import cloud
from cloudinit import distros
@@ -69,7 +68,7 @@ class TestRandomSeed(t_help.TestCase):
return
def _compress(self, text):
- contents = StringIO()
+ contents = BytesIO()
gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
gz_fh.write(text)
gz_fh.close()
@@ -96,7 +95,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-tim-was-here", contents)
def test_append_random_unknown_encoding(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -108,7 +107,7 @@ class TestRandomSeed(t_help.TestCase):
self._get_cloud('ubuntu'), LOG, [])
def test_append_random_gzip(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -121,7 +120,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-toe", contents)
def test_append_random_gz(self):
- data = self._compress("big-toe")
+ data = self._compress(b"big-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -134,7 +133,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("big-toe", contents)
def test_append_random_base64(self):
- data = base64.b64encode('bubbles')
+ data = util.b64e('bubbles')
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -147,7 +146,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("bubbles", contents)
def test_append_random_b64(self):
- data = base64.b64encode('kit-kat')
+ data = util.b64e('kit-kat')
cfg = {
'random_seed': {
'file': self._seed_file,
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index e1530e30..d358b069 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -7,9 +7,11 @@ from cloudinit import util
from .. import helpers as t_help
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
from configobj import ConfigObj
@@ -19,7 +21,8 @@ LOG = logging.getLogger(__name__)
class TestHostname(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestHostname, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def _fetch_distro(self, kind):
cls = distros.fetch(kind)
@@ -38,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
if not distro.uses_systemd():
- contents = util.load_file("/etc/sysconfig/network")
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/sysconfig/network", decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
index 874db340..e3df8759 100644
--- a/tests/unittests/test_handler/test_handler_timezone.py
+++ b/tests/unittests/test_handler/test_handler_timezone.py
@@ -29,8 +29,10 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
+import shutil
+import tempfile
import logging
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestTimezone(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestTimezone, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -67,8 +70,8 @@ class TestTimezone(t_help.FilesystemMockingTestCase):
cc_timezone.handle('cc_timezone', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/clock')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/clock', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg))
contents = util.load_file('/etc/localtime')
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 435c9787..3a8aa7c1 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -4,9 +4,11 @@ from cloudinit.config import cc_yum_add_repo
from .. import helpers
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
import configobj
@@ -16,7 +18,8 @@ LOG = logging.getLogger(__name__)
class TestConfig(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestConfig, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_bad_config(self):
cfg = {
@@ -52,8 +55,9 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
- contents = configobj.ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo",
+ decode=False)
+ contents = configobj.ConfigObj(BytesIO(contents))
expected = {
'epel_testing': {
'name': 'Extra Packages for Enterprise Linux 5 - Testing',
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 07b610f7..976d8283 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -11,11 +11,13 @@ import glob
import os
import random
import re
+import six
import string
SOURCE_PAT = "source*.*yaml"
EXPECTED_PAT = "expected%s.yaml"
-TYPES = [long, int, dict, str, list, tuple, None]
+TYPES = [dict, str, list, tuple, None]
+TYPES.extend(six.integer_types)
def _old_mergedict(src, cand):
@@ -25,7 +27,7 @@ def _old_mergedict(src, cand):
Nested dictionaries are merged recursively.
"""
if isinstance(src, dict) and isinstance(cand, dict):
- for (k, v) in cand.iteritems():
+ for (k, v) in cand.items():
if k not in src:
src[k] = v
else:
@@ -42,8 +44,8 @@ def _old_mergemanydict(*args):
def _random_str(rand):
base = ''
- for _i in xrange(rand.randint(1, 2 ** 8)):
- base += rand.choice(string.letters + string.digits)
+ for _i in range(rand.randint(1, 2 ** 8)):
+ base += rand.choice(string.ascii_letters + string.digits)
return base
@@ -64,7 +66,7 @@ def _make_dict(current_depth, max_depth, rand):
if t in [dict, list, tuple]:
if t in [dict]:
amount = rand.randint(0, 5)
- keys = [_random_str(rand) for _i in xrange(0, amount)]
+ keys = [_random_str(rand) for _i in range(0, amount)]
base = {}
for k in keys:
try:
@@ -74,14 +76,14 @@ def _make_dict(current_depth, max_depth, rand):
elif t in [list, tuple]:
base = []
amount = rand.randint(0, 5)
- for _i in xrange(0, amount):
+ for _i in range(0, amount):
try:
base.append(_make_dict(current_depth + 1, max_depth, rand))
except _NoMoreException:
pass
if t in [tuple]:
base = tuple(base)
- elif t in [long, int]:
+ elif t in six.integer_types:
base = rand.randint(0, 2 ** 8)
elif t in [str]:
base = _random_str(rand)
diff --git a/tests/unittests/test_pathprefix2dict.py b/tests/unittests/test_pathprefix2dict.py
index 590c4b82..7089bde6 100644
--- a/tests/unittests/test_pathprefix2dict.py
+++ b/tests/unittests/test_pathprefix2dict.py
@@ -1,13 +1,17 @@
from cloudinit import util
-from mocker import MockerTestCase
-from .helpers import populate_dir
+from .helpers import TestCase, populate_dir
+import shutil
+import tempfile
-class TestPathPrefix2Dict(MockerTestCase):
+
+class TestPathPrefix2Dict(TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ super(TestPathPrefix2Dict, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_required_only(self):
dirdata = {'f1': 'f1content', 'f2': 'f2content'}
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
index 977adb34..d0ec36a9 100644
--- a/tests/unittests/test_runs/test_merge_run.py
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -1,20 +1,22 @@
import os
+import shutil
+import tempfile
from .. import helpers
-from cloudinit.settings import (PER_INSTANCE)
+from cloudinit.settings import PER_INSTANCE
from cloudinit import stages
from cloudinit import util
class TestMergeRun(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def test_none_ds(self):
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.replicateTestRoot('simple_ubuntu', new_root)
cfg = {
'datasource_list': ['None'],
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
index c9ba949e..e19e65cd 100644
--- a/tests/unittests/test_runs/test_simple_run.py
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -1,20 +1,20 @@
import os
+import shutil
+import tempfile
from .. import helpers
-from cloudinit.settings import (PER_INSTANCE)
+from cloudinit.settings import PER_INSTANCE
from cloudinit import stages
from cloudinit import util
class TestSimpleRun(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def _pp_root(self, root, repatch=True):
- self.restore()
for (dirpath, dirnames, filenames) in os.walk(root):
print(dirpath)
for f in filenames:
@@ -33,7 +33,8 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):
self._patchIn(root)
def test_none_ds(self):
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.replicateTestRoot('simple_ubuntu', new_root)
cfg = {
'datasource_list': ['None'],
@@ -41,7 +42,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):
{
'path': '/etc/blah.ini',
'content': 'blah',
- 'permissions': 0755,
+ 'permissions': 0o755,
},
],
'cloud_init_modules': ['write-files'],
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index 3ba4ed8a..cf7c03b0 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -16,11 +16,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
+
+import sys
+import six
+import unittest
+
from . import helpers as test_helpers
import textwrap
from cloudinit import templater
+try:
+ import Cheetah
+ HAS_CHEETAH = True
+except ImportError:
+ HAS_CHEETAH = False
+
class TestTemplates(test_helpers.TestCase):
def test_render_basic(self):
@@ -38,6 +50,7 @@ class TestTemplates(test_helpers.TestCase):
out_data = templater.basic_render(in_data, {'b': 2})
self.assertEqual(expected_data.strip(), out_data)
+ @test_helpers.skipIf(not HAS_CHEETAH, 'cheetah renderer not available')
def test_detection(self):
blob = "## template:cheetah"
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 3e079131..a1bd2c46 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,10 +1,18 @@
+from __future__ import print_function
+
import os
import stat
import yaml
+import shutil
+import tempfile
-from mocker import MockerTestCase
from . import helpers
-import unittest
+import six
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from cloudinit import importer
from cloudinit import util
@@ -29,7 +37,7 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestGetCfgOptionListOrStr(unittest.TestCase):
+class TestGetCfgOptionListOrStr(helpers.TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
@@ -61,10 +69,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):
self.assertEqual([], result)
-class TestWriteFile(MockerTestCase):
+class TestWriteFile(helpers.TestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
@@ -79,7 +88,7 @@ class TestWriteFile(MockerTestCase):
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
- self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
@@ -97,12 +106,12 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- util.write_file(path, contents, mode=0666)
+ util.write_file(path, contents, mode=0o666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
- self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
@@ -111,7 +120,7 @@ class TestWriteFile(MockerTestCase):
# Create file first with basic content
with open(path, "wb") as f:
- f.write("LINE1\n")
+ f.write(b"LINE1\n")
util.write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
@@ -126,23 +135,24 @@ class TestWriteFile(MockerTestCase):
with open(my_file, "w") as fp:
fp.write("My Content")
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock('selinux')
-
fake_se = FakeSelinux(my_file)
- self.mocker.result(fake_se)
- self.mocker.replay()
- with util.SeLinuxGuard(my_file) as is_on:
- self.assertTrue(is_on)
+
+ with mock.patch.object(importer, 'import_module',
+ return_value=fake_se) as mockobj:
+ with util.SeLinuxGuard(my_file) as is_on:
+ self.assertTrue(is_on)
+
self.assertEqual(1, len(fake_se.restored))
self.assertEqual(my_file, fake_se.restored[0])
+ mockobj.assert_called_once_with('selinux')
+
-class TestDeleteDirContents(MockerTestCase):
+class TestDeleteDirContents(helpers.TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
@@ -157,7 +167,7 @@ class TestDeleteDirContents(MockerTestCase):
def test_deletes_files(self):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -185,7 +195,7 @@ class TestDeleteDirContents(MockerTestCase):
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -196,7 +206,7 @@ class TestDeleteDirContents(MockerTestCase):
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
with open(file_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
os.symlink(file_name, link_name)
util.delete_dir_contents(self.tmp)
@@ -204,20 +214,20 @@ class TestDeleteDirContents(MockerTestCase):
self.assertDirEmpty(self.tmp)
-class TestKeyValStrings(unittest.TestCase):
+class TestKeyValStrings(helpers.TestCase):
def test_keyval_str_to_dict(self):
expected = {'1': 'one', '2': 'one+one', 'ro': True}
cmdline = "1=one ro 2=one+one"
self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
-class TestGetCmdline(unittest.TestCase):
+class TestGetCmdline(helpers.TestCase):
def test_cmdline_reads_debug_env(self):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
-class TestLoadYaml(unittest.TestCase):
+class TestLoadYaml(helpers.TestCase):
mydefault = "7b03a8ebace993d806255121073fed52"
def test_simple(self):
@@ -246,8 +256,8 @@ class TestLoadYaml(unittest.TestCase):
self.mydefault)
def test_python_unicode(self):
- # complex type of python/unicde is explicitly allowed
- myobj = {'1': unicode("FOOBAR")}
+ # complex type of python/unicode is explicitly allowed
+ myobj = {'1': six.text_type("FOOBAR")}
safe_yaml = yaml.dump(myobj)
self.assertEqual(util.load_yaml(blob=safe_yaml,
default=self.mydefault),
@@ -314,17 +324,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
class TestReadDMIData(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def _write_key(self, key, content):
"""Mocks the sys path found on Linux systems."""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
- dmi_key = "/sys/class/dmi/id/{}".format(key)
+ dmi_key = "/sys/class/dmi/id/{0}".format(key)
util.write_file(dmi_key, content)
def _no_syspath(self, key, content):
@@ -332,7 +342,8 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
In order to test a missing sys path and call outs to dmidecode, this
function fakes the results of dmidecode to test the results.
"""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
self.real_which = util.which
self.real_subp = util.subp
diff --git a/tools/ccfg-merge-debug b/tools/ccfg-merge-debug
index 85227da7..1f08e0cb 100755
--- a/tools/ccfg-merge-debug
+++ b/tools/ccfg-merge-debug
@@ -51,7 +51,7 @@ def main():
c_handlers.register(ccph)
called = []
- for (_ctype, mod) in c_handlers.iteritems():
+ for (_ctype, mod) in c_handlers.items():
if mod in called:
continue
handlers.call_begin(mod, data, frequency)
@@ -76,7 +76,7 @@ def main():
# Give callbacks opportunity to finalize
called = []
- for (_ctype, mod) in c_handlers.iteritems():
+ for (_ctype, mod) in c_handlers.items():
if mod in called:
continue
handlers.call_end(mod, data, frequency)
diff --git a/tools/read-dependencies b/tools/read-dependencies
index fee3efcf..6a6f3e12 100755
--- a/tools/read-dependencies
+++ b/tools/read-dependencies
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import os
+import re
import sys
if 'CLOUD_INIT_TOP_D' in os.environ:
@@ -14,10 +15,15 @@ for fname in ("setup.py", "requirements.txt"):
"exist in cloud-init root directory." % fname)
sys.exit(1)
-with open(os.path.join(topd, "requirements.txt"), "r") as fp:
+if len(sys.argv) > 1:
+ reqfile = sys.argv[1]
+else:
+ reqfile = "requirements.txt"
+
+with open(os.path.join(topd, reqfile), "r") as fp:
for line in fp:
if not line.strip() or line.startswith("#"):
continue
- sys.stdout.write(line)
+ sys.stdout.write(re.split("[>=.<]*", line)[0].strip() + "\n")
sys.exit(0)
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 00000000..d04cd47c
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,23 @@
+[tox]
+envlist = py26,py27,py34
+recreate = True
+
+[testenv]
+commands = python -m nose tests
+deps =
+ contextlib2
+ httpretty>=0.7.1
+ mock
+ nose
+ pep8==1.5.7
+ pyflakes
+
+[testenv:py26]
+commands = nosetests tests
+deps =
+ contextlib2
+ httpretty>=0.7.1
+ mock
+ nose
+ pep8==1.5.7
+ pyflakes