diff options
-rwxr-xr-x | cloudinit/net/cmdline.py | 31 | ||||
-rw-r--r-- | cloudinit/util.py | 37 | ||||
-rw-r--r-- | tests/unittests/helpers.py | 7 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 69 |
4 files changed, 110 insertions, 34 deletions
diff --git a/cloudinit/net/cmdline.py b/cloudinit/net/cmdline.py index 61e23697..38b27a52 100755 --- a/cloudinit/net/cmdline.py +++ b/cloudinit/net/cmdline.py @@ -9,41 +9,12 @@ import base64 import glob import gzip import io -import shlex -import sys - -import six from . import get_devicelist from . import read_sys_net_safe from cloudinit import util -PY26 = sys.version_info[0:2] == (2, 6) - - -def _shlex_split(blob): - if PY26 and isinstance(blob, six.text_type): - # Older versions don't support unicode input - blob = blob.encode("utf8") - return shlex.split(blob) - - -def _load_shell_content(content, add_empty=False, empty_val=None): - """Given shell like syntax (key=value\nkey2=value2\n) in content - return the data in dictionary form. If 'add_empty' is True - then add entries in to the returned dictionary for 'VAR=' - variables. Set their value to empty_val.""" - data = {} - for line in _shlex_split(content): - key, value = line.split("=", 1) - if not value: - value = empty_val - if add_empty or value: - data[key] = value - - return data - def _klibc_to_config_entry(content, mac_addrs=None): """Convert a klibc written shell content file to a 'config' entry @@ -63,7 +34,7 @@ def _klibc_to_config_entry(content, mac_addrs=None): if mac_addrs is None: mac_addrs = {} - data = _load_shell_content(content) + data = util.load_shell_content(content) try: name = data['DEVICE'] if 'DEVICE' in data else data['DEVICE6'] except KeyError: diff --git a/cloudinit/util.py b/cloudinit/util.py index 27a98330..67ff7ba3 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -24,6 +24,7 @@ import platform import pwd import random import re +import shlex import shutil import socket import stat @@ -75,6 +76,7 @@ CONTAINER_TESTS = (['systemd-detect-virt', '--quiet', '--container'], PROC_CMDLINE = None _LSB_RELEASE = {} +PY26 = sys.version_info[0:2] == (2, 6) def get_architecture(target=None): @@ -2424,6 +2426,18 @@ def system_is_snappy(): # channel.ini is configparser loadable. # snappy will move to using /etc/system-image/config.d/*.ini # this is certainly not a perfect test, but good enough for now. + orpath = "/etc/os-release" + try: + orinfo = load_shell_content(load_file(orpath, quiet=True)) + if orinfo.get('ID', '').lower() == "ubuntu-core": + return True + except ValueError as e: + LOG.warning("Unexpected error loading '%s': %s", orpath, e) + + cmdline = get_cmdline() + if 'snap_core=' in cmdline: + return True + content = load_file("/etc/system-image/channel.ini", quiet=True) if 'ubuntu-core' in content.lower(): return True @@ -2470,4 +2484,27 @@ def rootdev_from_cmdline(cmdline): return "/dev/" + found +def load_shell_content(content, add_empty=False, empty_val=None): + """Given shell like syntax (key=value\nkey2=value2\n) in content + return the data in dictionary form. If 'add_empty' is True + then add entries in to the returned dictionary for 'VAR=' + variables. Set their value to empty_val.""" + + def _shlex_split(blob): + if PY26 and isinstance(blob, six.text_type): + # Older versions don't support unicode input + blob = blob.encode("utf8") + return shlex.split(blob) + + data = {} + for line in _shlex_split(content): + key, value = line.split("=", 1) + if not value: + value = empty_val + if add_empty or value: + data[key] = value + + return data + + # vi: ts=4 expandtab diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index a711404c..d24f817d 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -106,7 +106,7 @@ class CiTestCase(TestCase): return os.path.normpath(os.path.abspath(os.path.join(dir, path))) -class ResourceUsingTestCase(TestCase): +class ResourceUsingTestCase(CiTestCase): def setUp(self): super(ResourceUsingTestCase, self).setUp() self.resource_path = None @@ -229,8 +229,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def reRoot(self, root=None): if root is None: - root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, root) + root = self.tmp_dir() self.patchUtils(root) self.patchOS(root) return root @@ -256,7 +255,7 @@ def populate_dir(path, files): os.makedirs(path) ret = [] for (name, content) in files.items(): - p = os.path.join(path, name) + p = os.path.sep.join([path, name]) util.ensure_dir(os.path.dirname(p)) with open(p, "wb") as fp: if isinstance(content, six.binary_type): diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 189caca8..490760d1 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -712,4 +712,73 @@ class TestProcessExecutionError(helpers.TestCase): )).format(description=self.empty_description, empty_attr=self.empty_attr)) + +class TestSystemIsSnappy(helpers.FilesystemMockingTestCase): + def test_id_in_os_release_quoted(self): + """os-release containing ID="ubuntu-core" is snappy.""" + orcontent = '\n'.join(['ID="ubuntu-core"', '']) + root_d = self.tmp_dir() + helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + self.reRoot(root_d) + self.assertTrue(util.system_is_snappy()) + + def test_id_in_os_release(self): + """os-release containing ID=ubuntu-core is snappy.""" + orcontent = '\n'.join(['ID=ubuntu-core', '']) + root_d = self.tmp_dir() + helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + self.reRoot(root_d) + self.assertTrue(util.system_is_snappy()) + + @mock.patch('cloudinit.util.get_cmdline') + def test_bad_content_in_os_release_no_effect(self, m_cmdline): + """malformed os-release should not raise exception.""" + m_cmdline.return_value = 'root=/dev/sda' + orcontent = '\n'.join(['IDubuntu-core', '']) + root_d = self.tmp_dir() + helpers.populate_dir(root_d, {'etc/os-release': orcontent}) + self.reRoot() + self.assertFalse(util.system_is_snappy()) + + @mock.patch('cloudinit.util.get_cmdline') + def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): + """The string snap_core= in kernel cmdline indicates snappy.""" + cmdline = ( + "BOOT_IMAGE=(loop)/kernel.img root=LABEL=writable " + "snap_core=core_x1.snap snap_kernel=pc-kernel_x1.snap ro " + "net.ifnames=0 init=/lib/systemd/systemd console=tty1 " + "console=ttyS0 panic=-1") + m_cmdline.return_value = cmdline + self.assertTrue(util.system_is_snappy()) + self.assertTrue(m_cmdline.call_count > 0) + + @mock.patch('cloudinit.util.get_cmdline') + def test_nothing_found_is_not_snappy(self, m_cmdline): + """If no positive identification, then not snappy.""" + m_cmdline.return_value = 'root=/dev/sda' + self.reRoot() + self.assertFalse(util.system_is_snappy()) + self.assertTrue(m_cmdline.call_count > 0) + + @mock.patch('cloudinit.util.get_cmdline') + def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): + """A Channel.ini file with 'ubuntu-core' indicates snappy.""" + m_cmdline.return_value = 'root=/dev/sda' + root_d = self.tmp_dir() + content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""]) + helpers.populate_dir( + root_d, {'etc/system-image/channel.ini': content}) + self.reRoot(root_d) + self.assertTrue(util.system_is_snappy()) + + @mock.patch('cloudinit.util.get_cmdline') + def test_system_image_config_dir_is_snappy(self, m_cmdline): + """Existence of /etc/system-image/config.d indicates snappy.""" + m_cmdline.return_value = 'root=/dev/sda' + root_d = self.tmp_dir() + helpers.populate_dir( + root_d, {'etc/system-image/config.d/my.file': "_unused"}) + self.reRoot(root_d) + self.assertTrue(util.system_is_snappy()) + # vi: ts=4 expandtab |