From fb55c1079375454d2a2a2f82c6c1812759eeb1f1 Mon Sep 17 00:00:00 2001 From: Ben Howard Date: Fri, 24 Jan 2014 12:29:04 -0700 Subject: Fixes for SmartOS datasource (LP: #1272115): 1. fixed conflation of user-data and cloud-init user-data. Cloud-init user-data is now namespaced as 'cloud-init:user-data'. 2. user-scripts are now fetched from the meta-data service each boot and executed as in the scripts directory 3. datacenter name is now namespaced as sdc:datacenter 4. user-scripts should be shebanged if there is no file magic --- tests/unittests/test_datasource/test_smartos.py | 145 ++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 8 deletions(-) (limited to 'tests') diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 956767d8..ae427bb5 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -27,6 +27,10 @@ from cloudinit import helpers from cloudinit.sources import DataSourceSmartOS from mocker import MockerTestCase +import os +import os.path +import re +import stat import uuid MOCK_RETURNS = { @@ -35,7 +39,11 @@ MOCK_RETURNS = { 'disable_iptables_flag': None, 'enable_motd_sys_info': None, 'test-var1': 'some data', - 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), + 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), + 'sdc:datacenter_name': 'somewhere2', + 'sdc:operator-script': '\n'.join(['bin/true', '']), + 'user-data': '\n'.join(['something', '']), + 'user-script': '\n'.join(['/bin/true', '']), } DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') @@ -101,6 +109,7 @@ class TestSmartOSDataSource(MockerTestCase): def setUp(self): # makeDir comes from MockerTestCase self.tmp = self.makeDir() + self.legacy_user_d = self.makeDir() # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths({'cloud_dir': self.tmp}) @@ -138,6 +147,7 @@ class TestSmartOSDataSource(MockerTestCase): sys_cfg['datasource'] = sys_cfg.get('datasource', {}) sys_cfg['datasource']['SmartOS'] = ds_cfg + self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) self.apply_patches([(mod, 'get_serial', _get_serial)]) self.apply_patches([(mod, 'dmi_data', _dmi_data)]) dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, @@ -194,7 +204,7 @@ class TestSmartOSDataSource(MockerTestCase): # metadata provided base64_all of true my_returns = MOCK_RETURNS.copy() my_returns['base64_all'] = "true" - for k in ('hostname', 'user-data'): + for k in ('hostname', 'cloud-init:user-data'): my_returns[k] = base64.b64encode(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) @@ -202,7 +212,7 @@ class TestSmartOSDataSource(MockerTestCase): self.assertTrue(ret) self.assertEquals(MOCK_RETURNS['hostname'], dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['user-data'], + self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], dsrc.userdata_raw) self.assertEquals(MOCK_RETURNS['root_authorized_keys'], dsrc.metadata['public-keys']) @@ -213,9 +223,9 @@ class TestSmartOSDataSource(MockerTestCase): def test_b64_userdata(self): my_returns = MOCK_RETURNS.copy() - my_returns['b64-user-data'] = "true" + my_returns['b64-cloud-init:user-data'] = "true" my_returns['b64-hostname'] = "true" - for k in ('hostname', 'user-data'): + for k in ('hostname', 'cloud-init:user-data'): my_returns[k] = base64.b64encode(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) @@ -223,7 +233,8 @@ class TestSmartOSDataSource(MockerTestCase): self.assertTrue(ret) self.assertEquals(MOCK_RETURNS['hostname'], dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], + dsrc.userdata_raw) self.assertEquals(MOCK_RETURNS['root_authorized_keys'], dsrc.metadata['public-keys']) @@ -238,13 +249,131 @@ class TestSmartOSDataSource(MockerTestCase): self.assertTrue(ret) self.assertEquals(MOCK_RETURNS['hostname'], dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], + dsrc.userdata_raw) def test_userdata(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['user-data'], + dsrc.metadata['legacy-user-data']) + self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], + dsrc.userdata_raw) + + def test_sdc_scripts(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) + + legacy_script_f = "%s/user-script" % self.legacy_user_d + self.assertTrue(os.path.exists(legacy_script_f)) + self.assertTrue(os.path.islink(legacy_script_f)) + user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] + self.assertEquals(user_script_perm, '700') + + def test_scripts_shebanged(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) + + legacy_script_f = "%s/user-script" % self.legacy_user_d + self.assertTrue(os.path.exists(legacy_script_f)) + self.assertTrue(os.path.islink(legacy_script_f)) + shebang = None + with open(legacy_script_f, 'r') as f: + shebang = f.readlines()[0].strip() + self.assertEquals(shebang, "#!/bin/bash") + user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] + self.assertEquals(user_script_perm, '700') + + def test_scripts_shebang_not_added(self): + """ + Test that the SmartOS requirement that plain text scripts + are executable. This test makes sure that plain texts scripts + with out file magic have it added appropriately by cloud-init. + """ + + my_returns = MOCK_RETURNS.copy() + my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl', + 'print("hi")', '']) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(my_returns['user-script'], + dsrc.metadata['user-script']) + + legacy_script_f = "%s/user-script" % self.legacy_user_d + self.assertTrue(os.path.exists(legacy_script_f)) + self.assertTrue(os.path.islink(legacy_script_f)) + shebang = None + with open(legacy_script_f, 'r') as f: + shebang = f.readlines()[0].strip() + self.assertEquals(shebang, "#!/usr/bin/perl") + + def test_scripts_removed(self): + """ + Since SmartOS requires that the user script is fetched + each boot, we want to make sure that the information + is backed-up for user-review later. + + This tests the behavior of when a script is removed. It makes + sure that a) the previous script is backed-up; and 2) that + there is no script remaining. + """ + + script_d = os.path.join(self.tmp, "scripts", "per-boot") + os.makedirs(script_d) + + test_script_f = "%s/99_user_script" % script_d + with open(test_script_f, 'w') as f: + f.write("TEST DATA") + + my_returns = MOCK_RETURNS.copy() + del my_returns['user-script'] + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertFalse(dsrc.metadata['user-script']) + self.assertFalse(os.path.exists(test_script_f)) + + def test_userdata_removed(self): + """ + User-data in the SmartOS world is supposed to be written to a file + each and every boot. This tests to make sure that in the event the + legacy user-data is removed, the existing user-data is backed-up and + there is no /var/db/user-data left. + """ + + user_data_f = "%s/mdata-user-data" % self.legacy_user_d + with open(user_data_f, 'w') as f: + f.write("PREVIOUS") + + my_returns = MOCK_RETURNS.copy() + del my_returns['user-data'] + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertFalse(dsrc.metadata.get('legacy-user-data')) + + found_new = False + for root, _dirs, files in os.walk(self.legacy_user_d): + for name in files: + name_f = os.path.join(root, name) + permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] + if re.match(r'.*\/mdata-user-data$', name_f): + found_new = True + print name_f + self.assertEquals(permissions, '400') + + self.assertFalse(found_new) def test_disable_iptables_flag(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) -- cgit v1.2.3