summaryrefslogtreecommitdiff
path: root/cloudinit/cmd/devel
diff options
context:
space:
mode:
authorzsdc <taras@vyos.io>2022-03-25 20:58:01 +0200
committerzsdc <taras@vyos.io>2022-03-25 21:42:00 +0200
commit31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (patch)
tree349631a02467dae0158f6f663cc8aa8537974a97 /cloudinit/cmd/devel
parent5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff)
parent8537237d80a48c8f0cbf8e66aa4826bbc882b022 (diff)
downloadvyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.tar.gz
vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.zip
T2117: Cloud-init updated to 22.1
Merged with 22.1 tag from the upstream Cloud-init repository. Our modules were slightly modified for compatibility with the new version.
Diffstat (limited to 'cloudinit/cmd/devel')
-rw-r--r--cloudinit/cmd/devel/__init__.py3
-rw-r--r--cloudinit/cmd/devel/hotplug_hook.py291
-rw-r--r--cloudinit/cmd/devel/logs.py134
-rwxr-xr-xcloudinit/cmd/devel/make_mime.py113
-rwxr-xr-xcloudinit/cmd/devel/net_convert.py150
-rw-r--r--cloudinit/cmd/devel/parser.py45
-rwxr-xr-xcloudinit/cmd/devel/render.py54
-rw-r--r--cloudinit/cmd/devel/tests/__init__.py0
-rw-r--r--cloudinit/cmd/devel/tests/test_logs.py167
-rw-r--r--cloudinit/cmd/devel/tests/test_render.py144
10 files changed, 611 insertions, 490 deletions
diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py
index 3ae28b69..ead5f7a9 100644
--- a/cloudinit/cmd/devel/__init__.py
+++ b/cloudinit/cmd/devel/__init__.py
@@ -11,7 +11,7 @@ from cloudinit.stages import Init
def addLogHandlerCLI(logger, log_level):
"""Add a commandline logging handler to emit messages to stderr."""
- formatter = logging.Formatter('%(levelname)s: %(message)s')
+ formatter = logging.Formatter("%(levelname)s: %(message)s")
log.setupBasicLogging(log_level, formatter=formatter)
return logger
@@ -22,4 +22,5 @@ def read_cfg_paths():
init.read_cfg()
return init.paths
+
# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/devel/hotplug_hook.py b/cloudinit/cmd/devel/hotplug_hook.py
new file mode 100644
index 00000000..a9be0379
--- /dev/null
+++ b/cloudinit/cmd/devel/hotplug_hook.py
@@ -0,0 +1,291 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Handle reconfiguration on hotplug events"""
+import abc
+import argparse
+import os
+import sys
+import time
+
+from cloudinit import log, reporting, stages
+from cloudinit.event import EventScope, EventType
+from cloudinit.net import activators, read_sys_net_safe
+from cloudinit.net.network_state import parse_net_config_data
+from cloudinit.reporting import events
+from cloudinit.sources import DataSource # noqa: F401
+from cloudinit.sources import DataSourceNotFoundException
+from cloudinit.stages import Init
+
+LOG = log.getLogger(__name__)
+NAME = "hotplug-hook"
+
+
+def get_parser(parser=None):
+ """Build or extend an arg parser for hotplug-hook utility.
+
+ @param parser: Optional existing ArgumentParser instance representing the
+ subcommand which will be extended to support the args of this utility.
+
+ @returns: ArgumentParser with proper argument configuration.
+ """
+ if not parser:
+ parser = argparse.ArgumentParser(prog=NAME, description=__doc__)
+
+ parser.description = __doc__
+ parser.add_argument(
+ "-s",
+ "--subsystem",
+ required=True,
+ help="subsystem to act on",
+ choices=["net"],
+ )
+
+ subparsers = parser.add_subparsers(
+ title="Hotplug Action", dest="hotplug_action"
+ )
+ subparsers.required = True
+
+ subparsers.add_parser(
+ "query", help="query if hotplug is enabled for given subsystem"
+ )
+
+ parser_handle = subparsers.add_parser(
+ "handle", help="handle the hotplug event"
+ )
+ parser_handle.add_argument(
+ "-d",
+ "--devpath",
+ required=True,
+ metavar="PATH",
+ help="sysfs path to hotplugged device",
+ )
+ parser_handle.add_argument(
+ "-u",
+ "--udevaction",
+ required=True,
+ help="action to take",
+ choices=["add", "remove"],
+ )
+
+ return parser
+
+
+class UeventHandler(abc.ABC):
+ def __init__(self, id, datasource, devpath, action, success_fn):
+ self.id = id
+ self.datasource = datasource # type: DataSource
+ self.devpath = devpath
+ self.action = action
+ self.success_fn = success_fn
+
+ @abc.abstractmethod
+ def apply(self):
+ raise NotImplementedError()
+
+ @property
+ @abc.abstractmethod
+ def config(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def device_detected(self) -> bool:
+ raise NotImplementedError()
+
+ def detect_hotplugged_device(self):
+ detect_presence = None
+ if self.action == "add":
+ detect_presence = True
+ elif self.action == "remove":
+ detect_presence = False
+ else:
+ raise ValueError("Unknown action: %s" % self.action)
+
+ if detect_presence != self.device_detected():
+ raise RuntimeError(
+ "Failed to detect %s in updated metadata" % self.id
+ )
+
+ def success(self):
+ return self.success_fn()
+
+ def update_metadata(self):
+ result = self.datasource.update_metadata_if_supported(
+ [EventType.HOTPLUG]
+ )
+ if not result:
+ raise RuntimeError(
+ "Datasource %s not updated for event %s"
+ % (self.datasource, EventType.HOTPLUG)
+ )
+ return result
+
+
+class NetHandler(UeventHandler):
+ def __init__(self, datasource, devpath, action, success_fn):
+ # convert devpath to mac address
+ id = read_sys_net_safe(os.path.basename(devpath), "address")
+ super().__init__(id, datasource, devpath, action, success_fn)
+
+ def apply(self):
+ self.datasource.distro.apply_network_config(
+ self.config,
+ bring_up=False,
+ )
+ interface_name = os.path.basename(self.devpath)
+ activator = activators.select_activator()
+ if self.action == "add":
+ if not activator.bring_up_interface(interface_name):
+ raise RuntimeError(
+ "Failed to bring up device: {}".format(self.devpath)
+ )
+ elif self.action == "remove":
+ if not activator.bring_down_interface(interface_name):
+ raise RuntimeError(
+ "Failed to bring down device: {}".format(self.devpath)
+ )
+
+ @property
+ def config(self):
+ return self.datasource.network_config
+
+ def device_detected(self) -> bool:
+ netstate = parse_net_config_data(self.config)
+ found = [
+ iface
+ for iface in netstate.iter_interfaces()
+ if iface.get("mac_address") == self.id
+ ]
+ LOG.debug("Ifaces with ID=%s : %s", self.id, found)
+ return len(found) > 0
+
+
+SUBSYSTEM_PROPERTES_MAP = {
+ "net": (NetHandler, EventScope.NETWORK),
+}
+
+
+def is_enabled(hotplug_init, subsystem):
+ try:
+ scope = SUBSYSTEM_PROPERTES_MAP[subsystem][1]
+ except KeyError as e:
+ raise Exception(
+ "hotplug-hook: cannot handle events for subsystem: {}".format(
+ subsystem
+ )
+ ) from e
+
+ return stages.update_event_enabled(
+ datasource=hotplug_init.datasource,
+ cfg=hotplug_init.cfg,
+ event_source_type=EventType.HOTPLUG,
+ scope=scope,
+ )
+
+
+def initialize_datasource(hotplug_init, subsystem):
+ LOG.debug("Fetching datasource")
+ datasource = hotplug_init.fetch(existing="trust")
+
+ if not datasource.get_supported_events([EventType.HOTPLUG]):
+ LOG.debug("hotplug not supported for event of type %s", subsystem)
+ return
+
+ if not is_enabled(hotplug_init, subsystem):
+ LOG.debug("hotplug not enabled for event of type %s", subsystem)
+ return
+ return datasource
+
+
+def handle_hotplug(hotplug_init: Init, devpath, subsystem, udevaction):
+ datasource = initialize_datasource(hotplug_init, subsystem)
+ if not datasource:
+ return
+ handler_cls = SUBSYSTEM_PROPERTES_MAP[subsystem][0]
+ LOG.debug("Creating %s event handler", subsystem)
+ event_handler = handler_cls(
+ datasource=datasource,
+ devpath=devpath,
+ action=udevaction,
+ success_fn=hotplug_init._write_to_cache,
+ ) # type: UeventHandler
+ wait_times = [1, 3, 5, 10, 30]
+ for attempt, wait in enumerate(wait_times):
+ LOG.debug(
+ "subsystem=%s update attempt %s/%s",
+ subsystem,
+ attempt,
+ len(wait_times),
+ )
+ try:
+ LOG.debug("Refreshing metadata")
+ event_handler.update_metadata()
+ LOG.debug("Detecting device in updated metadata")
+ event_handler.detect_hotplugged_device()
+ LOG.debug("Applying config change")
+ event_handler.apply()
+ LOG.debug("Updating cache")
+ event_handler.success()
+ break
+ except Exception as e:
+ LOG.debug("Exception while processing hotplug event. %s", e)
+ time.sleep(wait)
+ last_exception = e
+ else:
+ raise last_exception # type: ignore
+
+
+def handle_args(name, args):
+ # Note that if an exception happens between now and when logging is
+ # setup, we'll only see it in the journal
+ hotplug_reporter = events.ReportEventStack(
+ name, __doc__, reporting_enabled=True
+ )
+
+ hotplug_init = Init(ds_deps=[], reporter=hotplug_reporter)
+ hotplug_init.read_cfg()
+
+ log.setupLogging(hotplug_init.cfg)
+ if "reporting" in hotplug_init.cfg:
+ reporting.update_configuration(hotplug_init.cfg.get("reporting"))
+ # Logging isn't going to be setup until now
+ LOG.debug(
+ "%s called with the following arguments: {"
+ "hotplug_action: %s, subsystem: %s, udevaction: %s, devpath: %s}",
+ name,
+ args.hotplug_action,
+ args.subsystem,
+ args.udevaction if "udevaction" in args else None,
+ args.devpath if "devpath" in args else None,
+ )
+
+ with hotplug_reporter:
+ try:
+ if args.hotplug_action == "query":
+ try:
+ datasource = initialize_datasource(
+ hotplug_init, args.subsystem
+ )
+ except DataSourceNotFoundException:
+ print(
+ "Unable to determine hotplug state. No datasource "
+ "detected"
+ )
+ sys.exit(1)
+ print("enabled" if datasource else "disabled")
+ else:
+ handle_hotplug(
+ hotplug_init=hotplug_init,
+ devpath=args.devpath,
+ subsystem=args.subsystem,
+ udevaction=args.udevaction,
+ )
+ except Exception:
+ LOG.exception("Received fatal exception handling hotplug!")
+ raise
+
+ LOG.debug("Exiting hotplug handler")
+ reporting.flush_events()
+
+
+if __name__ == "__main__":
+ args = get_parser().parse_args()
+ handle_args(NAME, args)
diff --git a/cloudinit/cmd/devel/logs.py b/cloudinit/cmd/devel/logs.py
index 51c61cca..d54b809a 100644
--- a/cloudinit/cmd/devel/logs.py
+++ b/cloudinit/cmd/devel/logs.py
@@ -5,20 +5,19 @@
"""Define 'collect-logs' utility and handler to include in cloud-init cmd."""
import argparse
-from datetime import datetime
import os
import shutil
import sys
+from datetime import datetime
from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
+from cloudinit.subp import ProcessExecutionError, subp
from cloudinit.temp_utils import tempdir
-from cloudinit.subp import (ProcessExecutionError, subp)
-from cloudinit.util import (chdir, copy, ensure_dir, write_file)
+from cloudinit.util import chdir, copy, ensure_dir, write_file
-
-CLOUDINIT_LOGS = ['/var/log/cloud-init.log', '/var/log/cloud-init-output.log']
-CLOUDINIT_RUN_DIR = '/run/cloud-init'
-USER_DATA_FILE = '/var/lib/cloud/instance/user-data.txt' # Optional
+CLOUDINIT_LOGS = ["/var/log/cloud-init.log", "/var/log/cloud-init-output.log"]
+CLOUDINIT_RUN_DIR = "/run/cloud-init"
+USER_DATA_FILE = "/var/lib/cloud/instance/user-data.txt" # Optional
def get_parser(parser=None):
@@ -32,27 +31,49 @@ def get_parser(parser=None):
"""
if not parser:
parser = argparse.ArgumentParser(
- prog='collect-logs',
- description='Collect and tar all cloud-init debug info')
- parser.add_argument('--verbose', '-v', action='count', default=0,
- dest='verbosity', help="Be more verbose.")
+ prog="collect-logs",
+ description="Collect and tar all cloud-init debug info",
+ )
+ parser.add_argument(
+ "--verbose",
+ "-v",
+ action="count",
+ default=0,
+ dest="verbosity",
+ help="Be more verbose.",
+ )
parser.add_argument(
- "--tarfile", '-t', default='cloud-init.tar.gz',
- help=('The tarfile to create containing all collected logs.'
- ' Default: cloud-init.tar.gz'))
+ "--tarfile",
+ "-t",
+ default="cloud-init.tar.gz",
+ help=(
+ "The tarfile to create containing all collected logs."
+ " Default: cloud-init.tar.gz"
+ ),
+ )
parser.add_argument(
- "--include-userdata", '-u', default=False, action='store_true',
- dest='userdata', help=(
- 'Optionally include user-data from {0} which could contain'
- ' sensitive information.'.format(USER_DATA_FILE)))
+ "--include-userdata",
+ "-u",
+ default=False,
+ action="store_true",
+ dest="userdata",
+ help=(
+ "Optionally include user-data from {0} which could contain"
+ " sensitive information.".format(USER_DATA_FILE)
+ ),
+ )
return parser
-def _copytree_ignore_sensitive_files(curdir, files):
- """Return a list of files to ignore if we are non-root"""
- if os.getuid() == 0:
- return ()
- return (INSTANCE_JSON_SENSITIVE_FILE,) # Ignore root-permissioned files
+def _copytree_rundir_ignore_files(curdir, files):
+ """Return a list of files to ignore for /run/cloud-init directory"""
+ ignored_files = [
+ "hook-hotplug-cmd", # named pipe for hotplug
+ ]
+ if os.getuid() != 0:
+ # Ignore root-permissioned files
+ ignored_files.append(INSTANCE_JSON_SENSITIVE_FILE)
+ return ignored_files
def _write_command_output_to_file(cmd, filename, msg, verbosity):
@@ -90,48 +111,67 @@ def collect_logs(tarfile, include_userdata, verbosity=0):
if include_userdata and os.getuid() != 0:
sys.stderr.write(
"To include userdata, root user is required."
- " Try sudo cloud-init collect-logs\n")
+ " Try sudo cloud-init collect-logs\n"
+ )
return 1
tarfile = os.path.abspath(tarfile)
- date = datetime.utcnow().date().strftime('%Y-%m-%d')
- log_dir = 'cloud-init-logs-{0}'.format(date)
- with tempdir(dir='/tmp') as tmp_dir:
+ date = datetime.utcnow().date().strftime("%Y-%m-%d")
+ log_dir = "cloud-init-logs-{0}".format(date)
+ with tempdir(dir="/tmp") as tmp_dir:
log_dir = os.path.join(tmp_dir, log_dir)
version = _write_command_output_to_file(
- ['cloud-init', '--version'],
- os.path.join(log_dir, 'version'),
- "cloud-init --version", verbosity)
+ ["cloud-init", "--version"],
+ os.path.join(log_dir, "version"),
+ "cloud-init --version",
+ verbosity,
+ )
dpkg_ver = _write_command_output_to_file(
- ['dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'],
- os.path.join(log_dir, 'dpkg-version'),
- "dpkg version", verbosity)
+ ["dpkg-query", "--show", "-f=${Version}\n", "cloud-init"],
+ os.path.join(log_dir, "dpkg-version"),
+ "dpkg version",
+ verbosity,
+ )
if not version:
version = dpkg_ver if dpkg_ver else "not-available"
_debug("collected cloud-init version: %s\n" % version, 1, verbosity)
_write_command_output_to_file(
- ['dmesg'], os.path.join(log_dir, 'dmesg.txt'),
- "dmesg output", verbosity)
+ ["dmesg"],
+ os.path.join(log_dir, "dmesg.txt"),
+ "dmesg output",
+ verbosity,
+ )
_write_command_output_to_file(
- ['journalctl', '--boot=0', '-o', 'short-precise'],
- os.path.join(log_dir, 'journal.txt'),
- "systemd journal of current boot", verbosity)
+ ["journalctl", "--boot=0", "-o", "short-precise"],
+ os.path.join(log_dir, "journal.txt"),
+ "systemd journal of current boot",
+ verbosity,
+ )
for log in CLOUDINIT_LOGS:
_collect_file(log, log_dir, verbosity)
if include_userdata:
_collect_file(USER_DATA_FILE, log_dir, verbosity)
- run_dir = os.path.join(log_dir, 'run')
+ run_dir = os.path.join(log_dir, "run")
ensure_dir(run_dir)
if os.path.exists(CLOUDINIT_RUN_DIR):
- shutil.copytree(CLOUDINIT_RUN_DIR,
- os.path.join(run_dir, 'cloud-init'),
- ignore=_copytree_ignore_sensitive_files)
+ try:
+ shutil.copytree(
+ CLOUDINIT_RUN_DIR,
+ os.path.join(run_dir, "cloud-init"),
+ ignore=_copytree_rundir_ignore_files,
+ )
+ except shutil.Error as e:
+ sys.stderr.write("Failed collecting file(s) due to error:\n")
+ sys.stderr.write(str(e) + "\n")
_debug("collected dir %s\n" % CLOUDINIT_RUN_DIR, 1, verbosity)
else:
- _debug("directory '%s' did not exist\n" % CLOUDINIT_RUN_DIR, 1,
- verbosity)
+ _debug(
+ "directory '%s' did not exist\n" % CLOUDINIT_RUN_DIR,
+ 1,
+ verbosity,
+ )
with chdir(tmp_dir):
- subp(['tar', 'czvf', tarfile, log_dir.replace(tmp_dir + '/', '')])
+ subp(["tar", "czvf", tarfile, log_dir.replace(tmp_dir + "/", "")])
sys.stderr.write("Wrote %s\n" % tarfile)
return 0
@@ -144,10 +184,10 @@ def handle_collect_logs_args(name, args):
def main():
"""Tool to collect and tar all cloud-init related logs."""
parser = get_parser()
- return handle_collect_logs_args('collect-logs', parser.parse_args())
+ return handle_collect_logs_args("collect-logs", parser.parse_args())
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/devel/make_mime.py b/cloudinit/cmd/devel/make_mime.py
index 4e6a5778..c7671a93 100755
--- a/cloudinit/cmd/devel/make_mime.py
+++ b/cloudinit/cmd/devel/make_mime.py
@@ -9,19 +9,44 @@ from email.mime.text import MIMEText
from cloudinit import log
from cloudinit.handlers import INCLUSION_TYPES_MAP
+
from . import addLogHandlerCLI
-NAME = 'make-mime'
+NAME = "make-mime"
LOG = log.getLogger(NAME)
-EPILOG = ("Example: make-mime -a config.yaml:cloud-config "
- "-a script.sh:x-shellscript > user-data")
+EPILOG = (
+ "Example: make-mime -a config.yaml:cloud-config "
+ "-a script.sh:x-shellscript > user-data"
+)
+
+
+def create_mime_message(files):
+ sub_messages = []
+ errors = []
+ for i, (fh, filename, format_type) in enumerate(files):
+ contents = fh.read()
+ sub_message = MIMEText(contents, format_type, sys.getdefaultencoding())
+ sub_message.add_header(
+ "Content-Disposition", 'attachment; filename="%s"' % (filename)
+ )
+ content_type = sub_message.get_content_type().lower()
+ if content_type not in get_content_types():
+ msg = (
+ "content type %r for attachment %s " "may be incorrect!"
+ ) % (content_type, i + 1)
+ errors.append(msg)
+ sub_messages.append(sub_message)
+ combined_message = MIMEMultipart()
+ for msg in sub_messages:
+ combined_message.attach(msg)
+ return (combined_message, errors)
def file_content_type(text):
- """ Return file content type by reading the first line of the input. """
+ """Return file content type by reading the first line of the input."""
try:
filename, content_type = text.split(":", 1)
- return (open(filename, 'r'), filename, content_type.strip())
+ return (open(filename, "r"), filename, content_type.strip())
except ValueError as e:
raise argparse.ArgumentError(
text, "Invalid value for %r" % (text)
@@ -41,26 +66,43 @@ def get_parser(parser=None):
# update the parser's doc and add an epilog to show an example
parser.description = __doc__
parser.epilog = EPILOG
- parser.add_argument("-a", "--attach", dest="files", type=file_content_type,
- action='append', default=[],
- metavar="<file>:<content-type>",
- help=("attach the given file as the specified "
- "content-type"))
- parser.add_argument('-l', '--list-types', action='store_true',
- default=False,
- help='List support cloud-init content types.')
- parser.add_argument('-f', '--force', action='store_true',
- default=False,
- help='Ignore unknown content-type warnings')
+ parser.add_argument(
+ "-a",
+ "--attach",
+ dest="files",
+ type=file_content_type,
+ action="append",
+ default=[],
+ metavar="<file>:<content-type>",
+ help="attach the given file as the specified content-type",
+ )
+ parser.add_argument(
+ "-l",
+ "--list-types",
+ action="store_true",
+ default=False,
+ help="List support cloud-init content types.",
+ )
+ parser.add_argument(
+ "-f",
+ "--force",
+ action="store_true",
+ default=False,
+ help="Ignore unknown content-type warnings",
+ )
return parser
def get_content_types(strip_prefix=False):
- """ Return a list of cloud-init supported content types. Optionally
- strip out the leading 'text/' of the type if strip_prefix=True.
+ """Return a list of cloud-init supported content types. Optionally
+ strip out the leading 'text/' of the type if strip_prefix=True.
"""
- return sorted([ctype.replace("text/", "") if strip_prefix else ctype
- for ctype in INCLUSION_TYPES_MAP.values()])
+ return sorted(
+ [
+ ctype.replace("text/", "") if strip_prefix else ctype
+ for ctype in INCLUSION_TYPES_MAP.values()
+ ]
+ )
def handle_args(name, args):
@@ -77,37 +119,24 @@ def handle_args(name, args):
print("\n".join(get_content_types(strip_prefix=True)))
return 0
- sub_messages = []
- errors = []
- for i, (fh, filename, format_type) in enumerate(args.files):
- contents = fh.read()
- sub_message = MIMEText(contents, format_type, sys.getdefaultencoding())
- sub_message.add_header('Content-Disposition',
- 'attachment; filename="%s"' % (filename))
- content_type = sub_message.get_content_type().lower()
- if content_type not in get_content_types():
- level = "WARNING" if args.force else "ERROR"
- msg = (level + ": content type %r for attachment %s "
- "may be incorrect!") % (content_type, i + 1)
- sys.stderr.write(msg + '\n')
- errors.append(msg)
- sub_messages.append(sub_message)
- if len(errors) and not args.force:
+ combined_message, errors = create_mime_message(args.files)
+ if errors:
+ level = "WARNING" if args.force else "ERROR"
+ for error in errors:
+ sys.stderr.write(f"{level}: {error}\n")
sys.stderr.write("Invalid content-types, override with --force\n")
- return 1
- combined_message = MIMEMultipart()
- for msg in sub_messages:
- combined_message.attach(msg)
+ if not args.force:
+ return 1
print(combined_message)
return 0
def main():
args = get_parser().parse_args()
- return(handle_args(NAME, args))
+ return handle_args(NAME, args)
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/cloudinit/cmd/devel/net_convert.py b/cloudinit/cmd/devel/net_convert.py
index 80d217ca..18b1e7ff 100755
--- a/cloudinit/cmd/devel/net_convert.py
+++ b/cloudinit/cmd/devel/net_convert.py
@@ -6,15 +6,13 @@ import json
import os
import sys
-from cloudinit.sources.helpers import openstack
+from cloudinit import distros, log, safeyaml
+from cloudinit.net import eni, netplan, network_state, networkd, sysconfig
from cloudinit.sources import DataSourceAzure as azure
from cloudinit.sources import DataSourceOVF as ovf
+from cloudinit.sources.helpers import openstack
-from cloudinit import distros, safeyaml
-from cloudinit.net import eni, netplan, network_state, sysconfig
-from cloudinit import log
-
-NAME = 'net-convert'
+NAME = "net-convert"
def get_parser(parser=None):
@@ -27,30 +25,59 @@ def get_parser(parser=None):
"""
if not parser:
parser = argparse.ArgumentParser(prog=NAME, description=__doc__)
- parser.add_argument("-p", "--network-data", type=open,
- metavar="PATH", required=True)
- parser.add_argument("-k", "--kind",
- choices=['eni', 'network_data.json', 'yaml',
- 'azure-imds', 'vmware-imc'],
- required=True)
- parser.add_argument("-d", "--directory",
- metavar="PATH",
- help="directory to place output in",
- required=True)
- parser.add_argument("-D", "--distro",
- choices=[item for sublist in
- distros.OSFAMILIES.values()
- for item in sublist],
- required=True)
- parser.add_argument("-m", "--mac",
- metavar="name,mac",
- action='append',
- help="interface name to mac mapping")
- parser.add_argument("--debug", action='store_true',
- help='enable debug logging to stderr.')
- parser.add_argument("-O", "--output-kind",
- choices=['eni', 'netplan', 'sysconfig'],
- required=True)
+ parser.add_argument(
+ "-p",
+ "--network-data",
+ type=open,
+ metavar="PATH",
+ required=True,
+ help="The network configuration to read",
+ )
+ parser.add_argument(
+ "-k",
+ "--kind",
+ choices=[
+ "eni",
+ "network_data.json",
+ "yaml",
+ "azure-imds",
+ "vmware-imc",
+ ],
+ required=True,
+ help="The format of the given network config",
+ )
+ parser.add_argument(
+ "-d",
+ "--directory",
+ metavar="PATH",
+ help="directory to place output in",
+ required=True,
+ )
+ parser.add_argument(
+ "-D",
+ "--distro",
+ choices=[
+ item for sublist in distros.OSFAMILIES.values() for item in sublist
+ ],
+ required=True,
+ )
+ parser.add_argument(
+ "-m",
+ "--mac",
+ metavar="name,mac",
+ action="append",
+ help="interface name to mac mapping",
+ )
+ parser.add_argument(
+ "--debug", action="store_true", help="enable debug logging to stderr."
+ )
+ parser.add_argument(
+ "-O",
+ "--output-kind",
+ choices=["eni", "netplan", "networkd", "sysconfig"],
+ required=True,
+ help="The network config format to emit",
+ )
return parser
@@ -78,57 +105,68 @@ def handle_args(name, args):
pre_ns = eni.convert_eni_data(net_data)
elif args.kind == "yaml":
pre_ns = safeyaml.load(net_data)
- if 'network' in pre_ns:
- pre_ns = pre_ns.get('network')
+ if "network" in pre_ns:
+ pre_ns = pre_ns.get("network")
if args.debug:
- sys.stderr.write('\n'.join(
- ["Input YAML", safeyaml.dumps(pre_ns), ""]))
- elif args.kind == 'network_data.json':
+ sys.stderr.write(
+ "\n".join(["Input YAML", safeyaml.dumps(pre_ns), ""])
+ )
+ elif args.kind == "network_data.json":
pre_ns = openstack.convert_net_json(
- json.loads(net_data), known_macs=known_macs)
- elif args.kind == 'azure-imds':
+ json.loads(net_data), known_macs=known_macs
+ )
+ elif args.kind == "azure-imds":
pre_ns = azure.parse_network_config(json.loads(net_data))
- elif args.kind == 'vmware-imc':
+ elif args.kind == "vmware-imc":
config = ovf.Config(ovf.ConfigFile(args.network_data.name))
pre_ns = ovf.get_network_config_from_conf(config, False)
ns = network_state.parse_net_config_data(pre_ns)
- if not ns:
- raise RuntimeError("No valid network_state object created from"
- " input data")
if args.debug:
- sys.stderr.write('\n'.join(
- ["", "Internal State", safeyaml.dumps(ns), ""]))
+ sys.stderr.write(
+ "\n".join(["", "Internal State", safeyaml.dumps(ns), ""])
+ )
distro_cls = distros.fetch(args.distro)
distro = distro_cls(args.distro, {}, None)
config = {}
if args.output_kind == "eni":
r_cls = eni.Renderer
- config = distro.renderer_configs.get('eni')
+ config = distro.renderer_configs.get("eni")
elif args.output_kind == "netplan":
r_cls = netplan.Renderer
- config = distro.renderer_configs.get('netplan')
+ config = distro.renderer_configs.get("netplan")
# don't run netplan generate/apply
- config['postcmds'] = False
+ config["postcmds"] = False
# trim leading slash
- config['netplan_path'] = config['netplan_path'][1:]
+ config["netplan_path"] = config["netplan_path"][1:]
# enable some netplan features
- config['features'] = ['dhcp-use-domains', 'ipv6-mtu']
- else:
+ config["features"] = ["dhcp-use-domains", "ipv6-mtu"]
+ elif args.output_kind == "networkd":
+ r_cls = networkd.Renderer
+ config = distro.renderer_configs.get("networkd")
+ elif args.output_kind == "sysconfig":
r_cls = sysconfig.Renderer
- config = distro.renderer_configs.get('sysconfig')
+ config = distro.renderer_configs.get("sysconfig")
+ else:
+ raise RuntimeError("Invalid output_kind")
r = r_cls(config=config)
- sys.stderr.write(''.join([
- "Read input format '%s' from '%s'.\n" % (
- args.kind, args.network_data.name),
- "Wrote output format '%s' to '%s'\n" % (
- args.output_kind, args.directory)]) + "\n")
+ sys.stderr.write(
+ "".join(
+ [
+ "Read input format '%s' from '%s'.\n"
+ % (args.kind, args.network_data.name),
+ "Wrote output format '%s' to '%s'\n"
+ % (args.output_kind, args.directory),
+ ]
+ )
+ + "\n"
+ )
r.render_network_state(network_state=ns, target=args.directory)
-if __name__ == '__main__':
+if __name__ == "__main__":
args = get_parser().parse_args()
handle_args(NAME, args)
diff --git a/cloudinit/cmd/devel/parser.py b/cloudinit/cmd/devel/parser.py
index 1a3c46a4..76b16c2e 100644
--- a/cloudinit/cmd/devel/parser.py
+++ b/cloudinit/cmd/devel/parser.py
@@ -5,30 +5,47 @@
"""Define 'devel' subcommand argument parsers to include in cloud-init cmd."""
import argparse
+
from cloudinit.config import schema
-from . import net_convert
-from . import render
-from . import make_mime
+from . import hotplug_hook, make_mime, net_convert, render
def get_parser(parser=None):
if not parser:
parser = argparse.ArgumentParser(
- prog='cloudinit-devel',
- description='Run development cloud-init tools')
- subparsers = parser.add_subparsers(title='Subcommands', dest='subcommand')
+ prog="cloudinit-devel",
+ description="Run development cloud-init tools",
+ )
+ subparsers = parser.add_subparsers(title="Subcommands", dest="subcommand")
subparsers.required = True
subcmds = [
- ('schema', 'Validate cloud-config files for document schema',
- schema.get_parser, schema.handle_schema_args),
- (net_convert.NAME, net_convert.__doc__,
- net_convert.get_parser, net_convert.handle_args),
- (render.NAME, render.__doc__,
- render.get_parser, render.handle_args),
- (make_mime.NAME, make_mime.__doc__,
- make_mime.get_parser, make_mime.handle_args),
+ (
+ hotplug_hook.NAME,
+ hotplug_hook.__doc__,
+ hotplug_hook.get_parser,
+ hotplug_hook.handle_args,
+ ),
+ (
+ "schema",
+ "Validate cloud-config files for document schema",
+ schema.get_parser,
+ schema.handle_schema_args,
+ ),
+ (
+ net_convert.NAME,
+ net_convert.__doc__,
+ net_convert.get_parser,
+ net_convert.handle_args,
+ ),
+ (render.NAME, render.__doc__, render.get_parser, render.handle_args),
+ (
+ make_mime.NAME,
+ make_mime.__doc__,
+ make_mime.get_parser,
+ make_mime.handle_args,
+ ),
]
for (subcmd, helpmsg, get_parser, handler) in subcmds:
parser = subparsers.add_parser(subcmd, help=helpmsg)
diff --git a/cloudinit/cmd/devel/render.py b/cloudinit/cmd/devel/render.py
index 1090aa16..2f9a22a8 100755
--- a/cloudinit/cmd/devel/render.py
+++ b/cloudinit/cmd/devel/render.py
@@ -6,12 +6,13 @@ import argparse
import os
import sys
-from cloudinit.handlers.jinja_template import render_jinja_payload_from_file
from cloudinit import log
+from cloudinit.handlers.jinja_template import render_jinja_payload_from_file
from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE
+
from . import addLogHandlerCLI, read_cfg_paths
-NAME = 'render'
+NAME = "render"
LOG = log.getLogger(NAME)
@@ -27,13 +28,24 @@ def get_parser(parser=None):
if not parser:
parser = argparse.ArgumentParser(prog=NAME, description=__doc__)
parser.add_argument(
- 'user_data', type=str, help='Path to the user-data file to render')
+ "user_data", type=str, help="Path to the user-data file to render"
+ )
+ parser.add_argument(
+ "-i",
+ "--instance-data",
+ type=str,
+ help=(
+ "Optional path to instance-data.json file. Defaults to"
+ " /run/cloud-init/instance-data.json"
+ ),
+ )
parser.add_argument(
- '-i', '--instance-data', type=str,
- help=('Optional path to instance-data.json file. Defaults to'
- ' /run/cloud-init/instance-data.json'))
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='Add verbose messages during template render')
+ "-d",
+ "--debug",
+ action="store_true",
+ default=False,
+ help="Add verbose messages during template render",
+ )
return parser
@@ -54,34 +66,38 @@ def handle_args(name, args):
redacted_data_fn = os.path.join(paths.run_dir, INSTANCE_JSON_FILE)
if uid == 0:
instance_data_fn = os.path.join(
- paths.run_dir, INSTANCE_JSON_SENSITIVE_FILE)
+ paths.run_dir, INSTANCE_JSON_SENSITIVE_FILE
+ )
if not os.path.exists(instance_data_fn):
LOG.warning(
- 'Missing root-readable %s. Using redacted %s instead.',
- instance_data_fn, redacted_data_fn
+ "Missing root-readable %s. Using redacted %s instead.",
+ instance_data_fn,
+ redacted_data_fn,
)
instance_data_fn = redacted_data_fn
else:
instance_data_fn = redacted_data_fn
if not os.path.exists(instance_data_fn):
- LOG.error('Missing instance-data.json file: %s', instance_data_fn)
+ LOG.error("Missing instance-data.json file: %s", instance_data_fn)
return 1
try:
with open(args.user_data) as stream:
user_data = stream.read()
except IOError:
- LOG.error('Missing user-data file: %s', args.user_data)
+ LOG.error("Missing user-data file: %s", args.user_data)
return 1
try:
rendered_payload = render_jinja_payload_from_file(
- payload=user_data, payload_fn=args.user_data,
+ payload=user_data,
+ payload_fn=args.user_data,
instance_data_file=instance_data_fn,
- debug=True if args.debug else False)
+ debug=True if args.debug else False,
+ )
except RuntimeError as e:
- LOG.error('Cannot render from instance data: %s', str(e))
+ LOG.error("Cannot render from instance data: %s", str(e))
return 1
if not rendered_payload:
- LOG.error('Unable to render user-data file: %s', args.user_data)
+ LOG.error("Unable to render user-data file: %s", args.user_data)
return 1
sys.stdout.write(rendered_payload)
return 0
@@ -89,10 +105,10 @@ def handle_args(name, args):
def main():
args = get_parser().parse_args()
- return(handle_args(NAME, args))
+ return handle_args(NAME, args)
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/cloudinit/cmd/devel/tests/__init__.py b/cloudinit/cmd/devel/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cloudinit/cmd/devel/tests/__init__.py
+++ /dev/null
diff --git a/cloudinit/cmd/devel/tests/test_logs.py b/cloudinit/cmd/devel/tests/test_logs.py
deleted file mode 100644
index ddfd58e1..00000000
--- a/cloudinit/cmd/devel/tests/test_logs.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from datetime import datetime
-import os
-from io import StringIO
-
-from cloudinit.cmd.devel import logs
-from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
-from cloudinit.tests.helpers import (
- FilesystemMockingTestCase, mock, wrap_and_call)
-from cloudinit.subp import subp
-from cloudinit.util import ensure_dir, load_file, write_file
-
-
-@mock.patch('cloudinit.cmd.devel.logs.os.getuid')
-class TestCollectLogs(FilesystemMockingTestCase):
-
- def setUp(self):
- super(TestCollectLogs, self).setUp()
- self.new_root = self.tmp_dir()
- self.run_dir = self.tmp_path('run', self.new_root)
-
- def test_collect_logs_with_userdata_requires_root_user(self, m_getuid):
- """collect-logs errors when non-root user collects userdata ."""
- m_getuid.return_value = 100 # non-root
- output_tarfile = self.tmp_path('logs.tgz')
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- self.assertEqual(
- 1, logs.collect_logs(output_tarfile, include_userdata=True))
- self.assertEqual(
- 'To include userdata, root user is required.'
- ' Try sudo cloud-init collect-logs\n',
- m_stderr.getvalue())
-
- def test_collect_logs_creates_tarfile(self, m_getuid):
- """collect-logs creates a tarfile with all related cloud-init info."""
- m_getuid.return_value = 100
- log1 = self.tmp_path('cloud-init.log', self.new_root)
- write_file(log1, 'cloud-init-log')
- log2 = self.tmp_path('cloud-init-output.log', self.new_root)
- write_file(log2, 'cloud-init-output-log')
- ensure_dir(self.run_dir)
- write_file(self.tmp_path('results.json', self.run_dir), 'results')
- write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
- 'sensitive')
- output_tarfile = self.tmp_path('logs.tgz')
-
- date = datetime.utcnow().date().strftime('%Y-%m-%d')
- date_logdir = 'cloud-init-logs-{0}'.format(date)
-
- version_out = '/usr/bin/cloud-init 18.2fake\n'
- expected_subp = {
- ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'):
- '0.7fake\n',
- ('cloud-init', '--version'): version_out,
- ('dmesg',): 'dmesg-out\n',
- ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n',
- ('tar', 'czvf', output_tarfile, date_logdir): ''
- }
-
- def fake_subp(cmd):
- cmd_tuple = tuple(cmd)
- if cmd_tuple not in expected_subp:
- raise AssertionError(
- 'Unexpected command provided to subp: {0}'.format(cmd))
- if cmd == ['tar', 'czvf', output_tarfile, date_logdir]:
- subp(cmd) # Pass through tar cmd so we can check output
- return expected_subp[cmd_tuple], ''
-
- fake_stderr = mock.MagicMock()
-
- wrap_and_call(
- 'cloudinit.cmd.devel.logs',
- {'subp': {'side_effect': fake_subp},
- 'sys.stderr': {'new': fake_stderr},
- 'CLOUDINIT_LOGS': {'new': [log1, log2]},
- 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}},
- logs.collect_logs, output_tarfile, include_userdata=False)
- # unpack the tarfile and check file contents
- subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root])
- out_logdir = self.tmp_path(date_logdir, self.new_root)
- self.assertFalse(
- os.path.exists(
- os.path.join(out_logdir, 'run', 'cloud-init',
- INSTANCE_JSON_SENSITIVE_FILE)),
- 'Unexpected file found: %s' % INSTANCE_JSON_SENSITIVE_FILE)
- self.assertEqual(
- '0.7fake\n',
- load_file(os.path.join(out_logdir, 'dpkg-version')))
- self.assertEqual(version_out,
- load_file(os.path.join(out_logdir, 'version')))
- self.assertEqual(
- 'cloud-init-log',
- load_file(os.path.join(out_logdir, 'cloud-init.log')))
- self.assertEqual(
- 'cloud-init-output-log',
- load_file(os.path.join(out_logdir, 'cloud-init-output.log')))
- self.assertEqual(
- 'dmesg-out\n',
- load_file(os.path.join(out_logdir, 'dmesg.txt')))
- self.assertEqual(
- 'journal-out\n',
- load_file(os.path.join(out_logdir, 'journal.txt')))
- self.assertEqual(
- 'results',
- load_file(
- os.path.join(out_logdir, 'run', 'cloud-init', 'results.json')))
- fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile)
-
- def test_collect_logs_includes_optional_userdata(self, m_getuid):
- """collect-logs include userdata when --include-userdata is set."""
- m_getuid.return_value = 0
- log1 = self.tmp_path('cloud-init.log', self.new_root)
- write_file(log1, 'cloud-init-log')
- log2 = self.tmp_path('cloud-init-output.log', self.new_root)
- write_file(log2, 'cloud-init-output-log')
- userdata = self.tmp_path('user-data.txt', self.new_root)
- write_file(userdata, 'user-data')
- ensure_dir(self.run_dir)
- write_file(self.tmp_path('results.json', self.run_dir), 'results')
- write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
- 'sensitive')
- output_tarfile = self.tmp_path('logs.tgz')
-
- date = datetime.utcnow().date().strftime('%Y-%m-%d')
- date_logdir = 'cloud-init-logs-{0}'.format(date)
-
- version_out = '/usr/bin/cloud-init 18.2fake\n'
- expected_subp = {
- ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'):
- '0.7fake',
- ('cloud-init', '--version'): version_out,
- ('dmesg',): 'dmesg-out\n',
- ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n',
- ('tar', 'czvf', output_tarfile, date_logdir): ''
- }
-
- def fake_subp(cmd):
- cmd_tuple = tuple(cmd)
- if cmd_tuple not in expected_subp:
- raise AssertionError(
- 'Unexpected command provided to subp: {0}'.format(cmd))
- if cmd == ['tar', 'czvf', output_tarfile, date_logdir]:
- subp(cmd) # Pass through tar cmd so we can check output
- return expected_subp[cmd_tuple], ''
-
- fake_stderr = mock.MagicMock()
-
- wrap_and_call(
- 'cloudinit.cmd.devel.logs',
- {'subp': {'side_effect': fake_subp},
- 'sys.stderr': {'new': fake_stderr},
- 'CLOUDINIT_LOGS': {'new': [log1, log2]},
- 'CLOUDINIT_RUN_DIR': {'new': self.run_dir},
- 'USER_DATA_FILE': {'new': userdata}},
- logs.collect_logs, output_tarfile, include_userdata=True)
- # unpack the tarfile and check file contents
- subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root])
- out_logdir = self.tmp_path(date_logdir, self.new_root)
- self.assertEqual(
- 'user-data',
- load_file(os.path.join(out_logdir, 'user-data.txt')))
- self.assertEqual(
- 'sensitive',
- load_file(os.path.join(out_logdir, 'run', 'cloud-init',
- INSTANCE_JSON_SENSITIVE_FILE)))
- fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile)
diff --git a/cloudinit/cmd/devel/tests/test_render.py b/cloudinit/cmd/devel/tests/test_render.py
deleted file mode 100644
index a7fcf2ce..00000000
--- a/cloudinit/cmd/devel/tests/test_render.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import os
-from io import StringIO
-
-from collections import namedtuple
-from cloudinit.cmd.devel import render
-from cloudinit.helpers import Paths
-from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE
-from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJinja
-from cloudinit.util import ensure_dir, write_file
-
-
-class TestRender(CiTestCase):
-
- with_logs = True
-
- args = namedtuple('renderargs', 'user_data instance_data debug')
-
- def setUp(self):
- super(TestRender, self).setUp()
- self.tmp = self.tmp_dir()
-
- def test_handle_args_error_on_missing_user_data(self):
- """When user_data file path does not exist, log an error."""
- absent_file = self.tmp_path('user-data', dir=self.tmp)
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
- write_file(instance_data, '{}')
- args = self.args(
- user_data=absent_file, instance_data=instance_data, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
- self.assertIn(
- 'Missing user-data file: %s' % absent_file,
- self.logs.getvalue())
-
- def test_handle_args_error_on_missing_instance_data(self):
- """When instance_data file path does not exist, log an error."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- absent_file = self.tmp_path('instance-data', dir=self.tmp)
- args = self.args(
- user_data=user_data, instance_data=absent_file, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
- self.assertIn(
- 'Missing instance-data.json file: %s' % absent_file,
- self.logs.getvalue())
-
- def test_handle_args_defaults_instance_data(self):
- """When no instance_data argument, default to configured run_dir."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- self.assertIn(
- 'Missing instance-data.json file: %s' % json_file,
- self.logs.getvalue())
-
- def test_handle_args_root_fallback_from_sensitive_instance_data(self):
- """When root user defaults to sensitive.json."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- self.assertEqual(1, render.handle_args('anyname', args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
- self.assertIn(
- 'WARNING: Missing root-readable %s. Using redacted %s' % (
- json_sensitive, json_file), self.logs.getvalue())
- self.assertIn(
- 'ERROR: Missing instance-data.json file: %s' % json_file,
- self.logs.getvalue())
-
- def test_handle_args_root_uses_sensitive_instance_data(self):
- """When root user, and no instance-data arg, use sensitive.json."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my_var }}')
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- ensure_dir(run_dir)
- json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
- write_file(json_sensitive, '{"my-var": "jinja worked"}')
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- self.assertEqual(0, render.handle_args('anyname', args))
- self.assertIn('rendering: jinja worked', m_stdout.getvalue())
-
- @skipUnlessJinja()
- def test_handle_args_renders_instance_data_vars_in_template(self):
- """If user_data file is a jinja template render instance-data vars."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my_var }}')
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
- write_file(instance_data, '{"my-var": "jinja worked"}')
- args = self.args(
- user_data=user_data, instance_data=instance_data, debug=True)
- with mock.patch('sys.stderr', new_callable=StringIO) as m_console_err:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- self.assertEqual(0, render.handle_args('anyname', args))
- self.assertIn(
- 'DEBUG: Converted jinja variables\n{', self.logs.getvalue())
- self.assertIn(
- 'DEBUG: Converted jinja variables\n{', m_console_err.getvalue())
- self.assertEqual('rendering: jinja worked', m_stdout.getvalue())
-
- @skipUnlessJinja()
- def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self):
- """If user_data file has invalid jinja operations log warnings."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my-var }}')
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
- write_file(instance_data, '{"my-var": "jinja worked"}')
- args = self.args(
- user_data=user_data, instance_data=instance_data, debug=True)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
- self.assertIn(
- 'WARNING: Ignoring jinja template for %s: Undefined jinja'
- ' variable: "my-var". Jinja tried subtraction. Perhaps you meant'
- ' "my_var"?' % user_data,
- self.logs.getvalue())
-
-# vi: ts=4 expandtab