summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_azure.py60
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py267
-rw-r--r--tests/unittests/test_datasource/test_smartos.py183
3 files changed, 470 insertions, 40 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 1ca6a79d..aad84206 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -2,8 +2,8 @@ from cloudinit import helpers
from cloudinit.sources import DataSourceAzure
from tests.unittests.helpers import populate_dir
-import crypt
import base64
+import crypt
from mocker import MockerTestCase
import os
import yaml
@@ -120,8 +120,7 @@ class TestAzureDataSource(MockerTestCase):
mod = DataSourceAzure
- if data.get('dsdevs'):
- self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
+ self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
(mod, 'write_files', _write_files),
@@ -154,9 +153,12 @@ class TestAzureDataSource(MockerTestCase):
def test_user_cfg_set_agent_command_plain(self):
# set dscfg in via plaintext
- cfg = {'agent_command': "my_command"}
+ # we must have friendly-to-xml formatted plaintext in yaml_cfg
+ # not all plaintext is expected to work.
+ yaml_cfg = "{agent_command: my_command}\n"
+ cfg = yaml.safe_load(yaml_cfg)
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}}
+ 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -290,11 +292,57 @@ class TestAzureDataSource(MockerTestCase):
self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
+ def test_default_ephemeral(self):
+ # make sure the ephemeral device works
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ cfg = dsrc.get_config_obj()
+
+ self.assertEquals(dsrc.device_name_to_device("ephemeral0"),
+ "/dev/sdb")
+ assert 'disk_setup' in cfg
+ assert 'fs_setup' in cfg
+ self.assertIsInstance(cfg['disk_setup'], dict)
+ self.assertIsInstance(cfg['fs_setup'], list)
+
+ def test_provide_disk_aliases(self):
+ # Make sure that user can affect disk aliases
+ dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)),
+ 'encoding': 'base64'}}
+ usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
+ 'ephemeral0': False}}
+ userdata = '#cloud-config' + yaml.dump(usercfg) + "\n"
+
+ ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata)
+ data = {'ovfcontent': ovfcontent, 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ cfg = dsrc.get_config_obj()
+ self.assertTrue(cfg)
+
+ def test_userdata_arrives(self):
+ userdata = "This is my user-data"
+ xml = construct_valid_ovf_env(data={}, userdata=userdata)
+ data = {'ovfcontent': xml}
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+
+ self.assertEqual(userdata, dsrc.userdata_raw)
+
class TestReadAzureOvf(MockerTestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
- self.assertRaises(DataSourceAzure.NonAzureDataSource,
+ self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
DataSourceAzure.read_azure_ovf, invalid_xml)
def test_load_with_pubkeys(self):
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
new file mode 100644
index 00000000..e1812a88
--- /dev/null
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -0,0 +1,267 @@
+from cloudinit import helpers
+from cloudinit.sources import DataSourceOpenNebula as ds
+from cloudinit import util
+from mocker import MockerTestCase
+from tests.unittests.helpers import populate_dir
+
+import os
+import pwd
+
+TEST_VARS = {
+ 'VAR1': 'single',
+ 'VAR2': 'double word',
+ 'VAR3': 'multi\nline\n',
+ 'VAR4': "'single'",
+ 'VAR5': "'double word'",
+ 'VAR6': "'multi\nline\n'",
+ 'VAR7': 'single\\t',
+ 'VAR8': 'double\\tword',
+ 'VAR9': 'multi\\t\nline\n',
+ 'VAR10': '\\', # expect \
+ 'VAR11': '\'', # expect '
+ 'VAR12': '$', # expect $
+}
+
+INVALID_CONTEXT = ';'
+USER_DATA = '#cloud-config\napt_upgrade: true'
+SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
+HOSTNAME = 'foo.example.com'
+PUBLIC_IP = '10.0.0.3'
+
+CMD_IP_OUT = '''\
+1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
+ link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
+2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
+ link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
+'''
+
+
+class TestOpenNebulaDataSource(MockerTestCase):
+ parsed_user = None
+
+ def setUp(self):
+ super(TestOpenNebulaDataSource, self).setUp()
+ self.tmp = self.makeDir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ # defaults for few tests
+ self.ds = ds.DataSourceOpenNebula
+ self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
+ self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
+
+ # we don't want 'sudo' called in tests. so we patch switch_user_cmd
+ def my_switch_user_cmd(user):
+ self.parsed_user = user
+ return []
+
+ self.switch_user_cmd_real = ds.switch_user_cmd
+ ds.switch_user_cmd = my_switch_user_cmd
+
+ def tearDown(self):
+ ds.switch_user_cmd = self.switch_user_cmd_real
+ super(TestOpenNebulaDataSource, self).tearDown()
+
+ def test_get_data_non_contextdisk(self):
+ orig_find_devs_with = util.find_devs_with
+ try:
+ # dont' try to lookup for CDs
+ util.find_devs_with = lambda n: []
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data_broken_contextdisk(self):
+ orig_find_devs_with = util.find_devs_with
+ try:
+ # dont' try to lookup for CDs
+ util.find_devs_with = lambda n: []
+ populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data_invalid_identity(self):
+ orig_find_devs_with = util.find_devs_with
+ try:
+ # generate non-existing system user name
+ sys_cfg = self.sys_cfg
+ invalid_user = 'invalid'
+ while not sys_cfg['datasource']['OpenNebula'].get('parseuser'):
+ try:
+ pwd.getpwnam(invalid_user)
+ invalid_user += 'X'
+ except KeyError:
+ sys_cfg['datasource']['OpenNebula']['parseuser'] = \
+ invalid_user
+
+ # dont' try to lookup for CDs
+ util.find_devs_with = lambda n: []
+ populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
+ dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_get_data(self):
+ orig_find_devs_with = util.find_devs_with
+ try:
+ # dont' try to lookup for CDs
+ util.find_devs_with = lambda n: []
+ populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
+ dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+ def test_seed_dir_non_contextdisk(self):
+ self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
+ self.seed_dir)
+
+ def test_seed_dir_empty1_context(self):
+ populate_dir(self.seed_dir, {'context.sh': ''})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertEqual(results['userdata'], None)
+ self.assertEqual(results['metadata'], {})
+
+ def test_seed_dir_empty2_context(self):
+ populate_context_dir(self.seed_dir, {})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertEqual(results['userdata'], None)
+ self.assertEqual(results['metadata'], {})
+
+ def test_seed_dir_broken_context(self):
+ populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
+
+ self.assertRaises(ds.BrokenContextDiskDir,
+ ds.read_context_disk_dir,
+ self.seed_dir)
+
+ def test_context_parser(self):
+ populate_context_dir(self.seed_dir, TEST_VARS)
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('metadata' in results)
+ self.assertEqual(TEST_VARS, results['metadata'])
+
+ def test_ssh_key(self):
+ public_keys = ['first key', 'second key']
+ for c in range(4):
+ for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
+ my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
+ populate_context_dir(my_d, {k: '\n'.join(public_keys)})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('metadata' in results)
+ self.assertTrue('public-keys' in results['metadata'])
+ self.assertEqual(public_keys,
+ results['metadata']['public-keys'])
+
+ public_keys.append(SSH_KEY % (c + 1,))
+
+ def test_user_data(self):
+ for k in ('USER_DATA', 'USERDATA'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: USER_DATA})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('userdata' in results)
+ self.assertEqual(USER_DATA, results['userdata'])
+
+ def test_hostname(self):
+ for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: PUBLIC_IP})
+ results = ds.read_context_disk_dir(my_d)
+
+ self.assertTrue('metadata' in results)
+ self.assertTrue('local-hostname' in results['metadata'])
+ self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
+
+ def test_network_interfaces(self):
+ populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+
+ def test_find_candidates(self):
+ def my_devs_with(criteria):
+ return {
+ "LABEL=CONTEXT": ["/dev/sdb"],
+ "LABEL=CDROM": ["/dev/sr0"],
+ "TYPE=iso9660": ["/dev/vdb"],
+ }.get(criteria, [])
+
+ orig_find_devs_with = util.find_devs_with
+ try:
+ util.find_devs_with = my_devs_with
+ self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"],
+ ds.find_candidate_devs())
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+
+class TestOpenNebulaNetwork(MockerTestCase):
+
+ def setUp(self):
+ super(TestOpenNebulaNetwork, self).setUp()
+
+ def test_lo(self):
+ net = ds.OpenNebulaNetwork('', {})
+ self.assertEqual(net.gen_conf(), u'''\
+auto lo
+iface lo inet loopback
+''')
+
+ def test_eth0(self):
+ net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
+ self.assertEqual(net.gen_conf(), u'''\
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 10.18.1.1
+ network 10.18.1.0
+ netmask 255.255.255.0
+''')
+
+ def test_eth0_override(self):
+ context = {
+ 'DNS': '1.2.3.8',
+ 'ETH0_IP': '1.2.3.4',
+ 'ETH0_NETWORK': '1.2.3.0',
+ 'ETH0_MASK': '255.255.0.0',
+ 'ETH0_GATEWAY': '1.2.3.5',
+ 'ETH0_DOMAIN': 'example.com',
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7'
+ }
+
+ net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
+ self.assertEqual(net.gen_conf(), u'''\
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 1.2.3.4
+ network 1.2.3.0
+ netmask 255.255.0.0
+ gateway 1.2.3.5
+ dns-search example.com
+ dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
+''')
+
+
+def populate_context_dir(path, variables):
+ data = "# Context variables generated by OpenNebula\n"
+ for (k, v) in variables.iteritems():
+ data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
+ populate_dir(path, {'context.sh': data})
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 6c12f1e2..956767d8 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -22,25 +22,24 @@
# return responses.
#
+import base64
from cloudinit import helpers
from cloudinit.sources import DataSourceSmartOS
from mocker import MockerTestCase
import uuid
-mock_returns = {
+MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
'disable_iptables_flag': None,
'enable_motd_sys_info': None,
- 'system_uuid': str(uuid.uuid4()),
- 'smartdc': 'smartdc',
- 'userdata': """
-#!/bin/sh
-/bin/true
-""",
+ 'test-var1': 'some data',
+ 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
}
+DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
+
class MockSerial(object):
"""Fake a serial terminal for testing the code that
@@ -48,12 +47,13 @@ class MockSerial(object):
port = None
- def __init__(self):
+ def __init__(self, mockdata):
self.last = None
self.last = None
self.new = True
self.count = 0
self.mocked_out = []
+ self.mockdata = mockdata
def open(self):
return True
@@ -71,30 +71,30 @@ class MockSerial(object):
def readline(self):
if self.new:
self.new = False
- if self.last in mock_returns:
+ if self.last in self.mockdata:
return 'SUCCESS\n'
else:
return 'NOTFOUND %s\n' % self.last
- if self.last in mock_returns:
+ if self.last in self.mockdata:
if not self.mocked_out:
self.mocked_out = [x for x in self._format_out()]
- print self.mocked_out
if len(self.mocked_out) > self.count:
self.count += 1
return self.mocked_out[self.count - 1]
def _format_out(self):
- if self.last in mock_returns:
+ if self.last in self.mockdata:
+ _mret = self.mockdata[self.last]
try:
- for l in mock_returns[self.last].splitlines():
- yield "%s\n" % l
+ for l in _mret.splitlines():
+ yield "%s\n" % l.rstrip()
except:
- yield "%s\n" % mock_returns[self.last]
+ yield "%s\n" % _mret.rstrip()
- yield '\n'
yield '.'
+ yield '\n'
class TestSmartOSDataSource(MockerTestCase):
@@ -116,23 +116,36 @@ class TestSmartOSDataSource(MockerTestCase):
ret = apply_patches(patches)
self.unapply += ret
- def _get_ds(self):
+ def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):
+ mod = DataSourceSmartOS
+
+ if mockdata is None:
+ mockdata = MOCK_RETURNS
+
+ if dmi_data is None:
+ dmi_data = DMI_DATA_RETURN
def _get_serial(*_):
- return MockSerial()
+ return MockSerial(mockdata)
def _dmi_data():
- return mock_returns['system_uuid'], 'smartdc'
+ return dmi_data
+
+ if sys_cfg is None:
+ sys_cfg = {}
+
+ if ds_cfg is not None:
+ sys_cfg['datasource'] = sys_cfg.get('datasource', {})
+ sys_cfg['datasource']['SmartOS'] = ds_cfg
- data = {'sys_cfg': {}}
- mod = DataSourceSmartOS
self.apply_patches([(mod, 'get_serial', _get_serial)])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
- dsrc = mod.DataSourceSmartOS(
- data.get('sys_cfg', {}), distro=None, paths=self.paths)
+ dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
+ paths=self.paths)
return dsrc
def test_seed(self):
+ # default seed should be /dev/ttyS1
dsrc = self._get_ds()
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -144,41 +157,143 @@ class TestSmartOSDataSource(MockerTestCase):
self.assertTrue(ret)
self.assertTrue(dsrc.is_smartdc)
+ def test_no_base64(self):
+ ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
+ dsrc = self._get_ds(ds_cfg=ds_cfg)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
def test_uuid(self):
- dsrc = self._get_ds()
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(mock_returns['system_uuid'],
- dsrc.metadata['instance-id'])
+ self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])
def test_root_keys(self):
- dsrc = self._get_ds()
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(mock_returns['root_authorized_keys'],
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
dsrc.metadata['public-keys'])
+ def test_hostname_b64(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
def test_hostname(self):
- dsrc = self._get_ds()
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_base64_all(self):
+ # metadata provided base64_all of true
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_all'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(mock_returns['hostname'],
+ self.assertEquals(MOCK_RETURNS['hostname'],
dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'],
+ dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+ def test_b64_userdata(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['b64-user-data'] = "true"
+ my_returns['b64-hostname'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_b64_keys(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_keys'] = 'hostname,ignored'
+ for k in ('hostname',):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+
+ def test_userdata(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
def test_disable_iptables_flag(self):
- dsrc = self._get_ds()
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(mock_returns['disable_iptables_flag'],
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
dsrc.metadata['iptables_disable'])
def test_motd_sys_info(self):
- dsrc = self._get_ds()
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(mock_returns['enable_motd_sys_info'],
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
dsrc.metadata['motd_sys_info'])
+ def test_default_ephemeral(self):
+ # Test to make sure that the builtin config has the ephemeral
+ # configuration.
+ dsrc = self._get_ds()
+ cfg = dsrc.get_config_obj()
+
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ assert 'disk_setup' in cfg
+ assert 'fs_setup' in cfg
+ self.assertIsInstance(cfg['disk_setup'], dict)
+ self.assertIsInstance(cfg['fs_setup'], list)
+
+ def test_override_disk_aliases(self):
+ # Test to make sure that the built-in DS is overriden
+ builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG
+
+ mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}}
+
+ # expect that these values are in builtin, or this is pointless
+ for k in mydscfg:
+ self.assertIn(k, builtin)
+
+ dsrc = self._get_ds(ds_cfg=mydscfg)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ self.assertEqual(mydscfg['disk_aliases']['FOO'],
+ dsrc.ds_cfg['disk_aliases']['FOO'])
+
+ self.assertEqual(dsrc.device_name_to_device('FOO'),
+ mydscfg['disk_aliases']['FOO'])
+
def apply_patches(patches):
ret = []