from cloudinit import helpers from cloudinit.sources import DataSourceAzure from tests.unittests.helpers import populate_dir import base64 from mocker import MockerTestCase import os import yaml def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): if data is None: data = {'HostName': 'FOOHOST'} if pubkeys is None: pubkeys = {} content = """ 1.0 LinuxProvisioningConfiguration """ for key, val in data.items(): content += "<%s>%s\n" % (key, val, key) if userdata: content += "%s\n" % (base64.b64encode(userdata)) if pubkeys: content += "\n" for fp, path in pubkeys: content += " " content += ("%s%s" % (fp, path)) content += "\n" content += "" content += """ 1.0 kms.core.windows.net false """ return content class TestAzureDataSource(MockerTestCase): def setUp(self): # makeDir comes from MockerTestCase self.tmp = self.makeDir() # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths({'cloud_dir': self.tmp}) self.unapply = [] super(TestAzureDataSource, self).setUp() def tearDown(self): apply_patches([i for i in reversed(self.unapply)]) super(TestAzureDataSource, self).tearDown() def apply_patches(self, patches): ret = apply_patches(patches) self.unapply += ret def _get_ds(self, data): def dsdevs(): return data.get('dsdevs', []) def _invoke_agent(cmd): data['agent_invoked'] = cmd def _write_files(datadir, files, dirmode): data['files'] = {} data['datadir'] = datadir data['datadir_mode'] = dirmode for (fname, content) in files.items(): data['files'][fname] = content def _wait_for_files(flist, _maxwait=None, _naplen=None): data['waited'] = flist return [] def _pubkeys_from_crt_files(flist): data['pubkey_files'] = flist return ["pubkey_from: %s" % f for f in flist] def _iid_from_shared_config(path): data['iid_from_shared_cfg'] = path return 'i-my-azure-id' if data.get('ovfcontent') is not None: populate_dir(os.path.join(self.paths.seed_dir, "azure"), {'ovf-env.xml': data['ovfcontent']}) mod = DataSourceAzure if data.get('dsdevs'): self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) self.apply_patches([(mod, 'invoke_agent', _invoke_agent), (mod, 'write_files', _write_files), (mod, 'wait_for_files', _wait_for_files), (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files), (mod, 'iid_from_shared_config', _iid_from_shared_config), ]) dsrc = mod.DataSourceAzureNet( data.get('sys_cfg', {}), distro=None, paths=self.paths) return dsrc def test_basic_seed_dir(self): odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata), 'sys_cfg': {}} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, "") self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName']) self.assertTrue('ovf-env.xml' in data['files']) self.assertEqual(0700, data['datadir_mode']) self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id') def test_user_cfg_set_agent_command(self): cfg = {'agent_command': "my_command"} odata = {'HostName': "myhost", 'UserName': "myuser", 'dscfg': yaml.dump(cfg)} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(data['agent_invoked'], cfg['agent_command']) def test_sys_cfg_set_agent_command(self): sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}} data = {'ovfcontent': construct_valid_ovf_env(data={}), 'sys_cfg': sys_cfg} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(data['agent_invoked'], '_COMMAND') def test_username_used(self): odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.cfg['system_info']['default_user']['name'], "myuser") def test_password_given(self): odata = {'HostName': "myhost", 'UserName': "myuser", 'UserPassword': "mypass"} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue('default_user' in dsrc.cfg['system_info']) defuser = dsrc.cfg['system_info']['default_user'] # default user shoudl be updated for password and username # and should not be locked. self.assertEqual(defuser['name'], odata['UserName']) self.assertEqual(defuser['password'], odata['UserPassword']) self.assertFalse(defuser['lock_passwd']) def test_userdata_found(self): mydata = "FOOBAR" odata = {'UserData': base64.b64encode(mydata)} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, mydata) def test_no_datasource_expected(self): #no source should be found if no seed_dir and no devs data = {} dsrc = self._get_ds({}) ret = dsrc.get_data() self.assertFalse(ret) self.assertFalse('agent_invoked' in data) def test_cfg_has_pubkeys(self): odata = {'HostName': "myhost", 'UserName': "myuser"} mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] data = {'ovfcontent': construct_valid_ovf_env(data=odata, pubkeys=pubkeys)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) for mypk in mypklist: self.assertIn(mypk, dsrc.cfg['_pubkeys']) class TestReadAzureOvf(MockerTestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "" + construct_valid_ovf_env(data={}) self.assertRaises(DataSourceAzure.NonAzureDataSource, DataSourceAzure.read_azure_ovf, invalid_xml) def test_load_with_pubkeys(self): mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] content = construct_valid_ovf_env(pubkeys=pubkeys) (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content) for mypk in mypklist: self.assertIn(mypk, cfg['_pubkeys']) class TestReadAzureSharedConfig(MockerTestCase): def test_valid_content(self): xml = """ """ ret = DataSourceAzure.iid_from_shared_config_content(xml) self.assertEqual("MY_INSTANCE_ID", ret) def apply_patches(patches): ret = [] for (ref, name, replace) in patches: if replace is None: continue orig = getattr(ref, name) setattr(ref, name, replace) ret.append((ref, name, orig)) return ret