summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcloudinit/net/cmdline.py31
-rw-r--r--cloudinit/util.py37
-rw-r--r--tests/unittests/helpers.py7
-rw-r--r--tests/unittests/test_util.py69
4 files changed, 110 insertions, 34 deletions
diff --git a/cloudinit/net/cmdline.py b/cloudinit/net/cmdline.py
index 61e23697..38b27a52 100755
--- a/cloudinit/net/cmdline.py
+++ b/cloudinit/net/cmdline.py
@@ -9,41 +9,12 @@ import base64
import glob
import gzip
import io
-import shlex
-import sys
-
-import six
from . import get_devicelist
from . import read_sys_net_safe
from cloudinit import util
-PY26 = sys.version_info[0:2] == (2, 6)
-
-
-def _shlex_split(blob):
- if PY26 and isinstance(blob, six.text_type):
- # Older versions don't support unicode input
- blob = blob.encode("utf8")
- return shlex.split(blob)
-
-
-def _load_shell_content(content, add_empty=False, empty_val=None):
- """Given shell like syntax (key=value\nkey2=value2\n) in content
- return the data in dictionary form. If 'add_empty' is True
- then add entries in to the returned dictionary for 'VAR='
- variables. Set their value to empty_val."""
- data = {}
- for line in _shlex_split(content):
- key, value = line.split("=", 1)
- if not value:
- value = empty_val
- if add_empty or value:
- data[key] = value
-
- return data
-
def _klibc_to_config_entry(content, mac_addrs=None):
"""Convert a klibc written shell content file to a 'config' entry
@@ -63,7 +34,7 @@ def _klibc_to_config_entry(content, mac_addrs=None):
if mac_addrs is None:
mac_addrs = {}
- data = _load_shell_content(content)
+ data = util.load_shell_content(content)
try:
name = data['DEVICE'] if 'DEVICE' in data else data['DEVICE6']
except KeyError:
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 27a98330..67ff7ba3 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -24,6 +24,7 @@ import platform
import pwd
import random
import re
+import shlex
import shutil
import socket
import stat
@@ -75,6 +76,7 @@ CONTAINER_TESTS = (['systemd-detect-virt', '--quiet', '--container'],
PROC_CMDLINE = None
_LSB_RELEASE = {}
+PY26 = sys.version_info[0:2] == (2, 6)
def get_architecture(target=None):
@@ -2424,6 +2426,18 @@ def system_is_snappy():
# channel.ini is configparser loadable.
# snappy will move to using /etc/system-image/config.d/*.ini
# this is certainly not a perfect test, but good enough for now.
+ orpath = "/etc/os-release"
+ try:
+ orinfo = load_shell_content(load_file(orpath, quiet=True))
+ if orinfo.get('ID', '').lower() == "ubuntu-core":
+ return True
+ except ValueError as e:
+ LOG.warning("Unexpected error loading '%s': %s", orpath, e)
+
+ cmdline = get_cmdline()
+ if 'snap_core=' in cmdline:
+ return True
+
content = load_file("/etc/system-image/channel.ini", quiet=True)
if 'ubuntu-core' in content.lower():
return True
@@ -2470,4 +2484,27 @@ def rootdev_from_cmdline(cmdline):
return "/dev/" + found
+def load_shell_content(content, add_empty=False, empty_val=None):
+ """Given shell like syntax (key=value\nkey2=value2\n) in content
+ return the data in dictionary form. If 'add_empty' is True
+ then add entries in to the returned dictionary for 'VAR='
+ variables. Set their value to empty_val."""
+
+ def _shlex_split(blob):
+ if PY26 and isinstance(blob, six.text_type):
+ # Older versions don't support unicode input
+ blob = blob.encode("utf8")
+ return shlex.split(blob)
+
+ data = {}
+ for line in _shlex_split(content):
+ key, value = line.split("=", 1)
+ if not value:
+ value = empty_val
+ if add_empty or value:
+ data[key] = value
+
+ return data
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index a711404c..d24f817d 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -106,7 +106,7 @@ class CiTestCase(TestCase):
return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
-class ResourceUsingTestCase(TestCase):
+class ResourceUsingTestCase(CiTestCase):
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
self.resource_path = None
@@ -229,8 +229,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def reRoot(self, root=None):
if root is None:
- root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, root)
+ root = self.tmp_dir()
self.patchUtils(root)
self.patchOS(root)
return root
@@ -256,7 +255,7 @@ def populate_dir(path, files):
os.makedirs(path)
ret = []
for (name, content) in files.items():
- p = os.path.join(path, name)
+ p = os.path.sep.join([path, name])
util.ensure_dir(os.path.dirname(p))
with open(p, "wb") as fp:
if isinstance(content, six.binary_type):
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 189caca8..490760d1 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -712,4 +712,73 @@ class TestProcessExecutionError(helpers.TestCase):
)).format(description=self.empty_description,
empty_attr=self.empty_attr))
+
+class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
+ def test_id_in_os_release_quoted(self):
+ """os-release containing ID="ubuntu-core" is snappy."""
+ orcontent = '\n'.join(['ID="ubuntu-core"', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ def test_id_in_os_release(self):
+ """os-release containing ID=ubuntu-core is snappy."""
+ orcontent = '\n'.join(['ID=ubuntu-core', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_bad_content_in_os_release_no_effect(self, m_cmdline):
+ """malformed os-release should not raise exception."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ orcontent = '\n'.join(['IDubuntu-core', ''])
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ self.reRoot()
+ self.assertFalse(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_snap_core_in_cmdline_is_snappy(self, m_cmdline):
+ """The string snap_core= in kernel cmdline indicates snappy."""
+ cmdline = (
+ "BOOT_IMAGE=(loop)/kernel.img root=LABEL=writable "
+ "snap_core=core_x1.snap snap_kernel=pc-kernel_x1.snap ro "
+ "net.ifnames=0 init=/lib/systemd/systemd console=tty1 "
+ "console=ttyS0 panic=-1")
+ m_cmdline.return_value = cmdline
+ self.assertTrue(util.system_is_snappy())
+ self.assertTrue(m_cmdline.call_count > 0)
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_nothing_found_is_not_snappy(self, m_cmdline):
+ """If no positive identification, then not snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ self.reRoot()
+ self.assertFalse(util.system_is_snappy())
+ self.assertTrue(m_cmdline.call_count > 0)
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_channel_ini_with_snappy_is_snappy(self, m_cmdline):
+ """A Channel.ini file with 'ubuntu-core' indicates snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ root_d = self.tmp_dir()
+ content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""])
+ helpers.populate_dir(
+ root_d, {'etc/system-image/channel.ini': content})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_system_image_config_dir_is_snappy(self, m_cmdline):
+ """Existence of /etc/system-image/config.d indicates snappy."""
+ m_cmdline.return_value = 'root=/dev/sda'
+ root_d = self.tmp_dir()
+ helpers.populate_dir(
+ root_d, {'etc/system-image/config.d/my.file': "_unused"})
+ self.reRoot(root_d)
+ self.assertTrue(util.system_is_snappy())
+
# vi: ts=4 expandtab