import json from cloudinit import helpers from cloudinit import distros from cloudinit.sources import DataSourceRbxCloud as ds from cloudinit.tests.helpers import mock, CiTestCase, populate_dir from cloudinit import subp DS_PATH = "cloudinit.sources.DataSourceRbxCloud" CRYPTO_PASS = "$6$uktth46t$FvpDzFD2iL9YNZIG1Epz7957hJqbH0f" \ "QKhnzcfBcUhEodGAWRqTy7tYG4nEW7SUOYBjxOSFIQW5" \ "tToyGP41.s1" CLOUD_METADATA = { "vm": { "memory": 4, "cpu": 2, "name": "vm-image-builder", "_id": "5beab44f680cffd11f0e60fc" }, "additionalMetadata": { "username": "guru", "sshKeys": ["ssh-rsa ..."], "password": { "sha512": CRYPTO_PASS } }, "disk": [ {"size": 10, "type": "ssd", "name": "vm-image-builder-os", "_id": "5beab450680cffd11f0e60fe"}, {"size": 2, "type": "ssd", "name": "ubuntu-1804-bionic", "_id": "5bef002c680cffd11f107590"} ], "netadp": [ { "ip": [{"address": "62.181.8.174"}], "network": { "dns": {"nameservers": ["8.8.8.8", "8.8.4.4"]}, "routing": [], "gateway": "62.181.8.1", "netmask": "255.255.248.0", "name": "public", "type": "public", "_id": "5784e97be2627505227b578c" }, "speed": 1000, "type": "hv", "macaddress": "00:15:5D:FF:0F:03", "_id": "5beab450680cffd11f0e6102" }, { "ip": [{"address": "10.209.78.11"}], "network": { "dns": {"nameservers": ["9.9.9.9", "8.8.8.8"]}, "routing": [], "gateway": "10.209.78.1", "netmask": "255.255.255.0", "name": "network-determined-bardeen", "type": "private", "_id": "5beaec64680cffd11f0e7c31" }, "speed": 1000, "type": "hv", "macaddress": "00:15:5D:FF:0F:24", "_id": "5bec18c6680cffd11f0f0d8b" } ], "dvddrive": [{"iso": {}}] } class TestRbxDataSource(CiTestCase): parsed_user = None allowed_subp = ['bash'] def _fetch_distro(self, kind): cls = distros.fetch(kind) paths = helpers.Paths({}) return cls(kind, {}, paths) def setUp(self): super(TestRbxDataSource, self).setUp() self.tmp = self.tmp_dir() self.paths = helpers.Paths( {'cloud_dir': self.tmp, 'run_dir': self.tmp} ) # defaults for few tests self.ds = ds.DataSourceRbxCloud self.seed_dir = self.paths.seed_dir self.sys_cfg = {'datasource': {'RbxCloud': {'dsmode': 'local'}}} def test_seed_read_user_data_callback_empty_file(self): populate_user_metadata(self.seed_dir, '') populate_cloud_metadata(self.seed_dir, {}) results = ds.read_user_data_callback(self.seed_dir) self.assertIsNone(results) def test_seed_read_user_data_callback_valid_disk(self): populate_user_metadata(self.seed_dir, '') populate_cloud_metadata(self.seed_dir, CLOUD_METADATA) results = ds.read_user_data_callback(self.seed_dir) self.assertNotEqual(results, None) self.assertTrue('userdata' in results) self.assertTrue('metadata' in results) self.assertTrue('cfg' in results) def test_seed_read_user_data_callback_userdata(self): userdata = "#!/bin/sh\nexit 1" populate_user_metadata(self.seed_dir, userdata) populate_cloud_metadata(self.seed_dir, CLOUD_METADATA) results = ds.read_user_data_callback(self.seed_dir) self.assertNotEqual(results, None) self.assertTrue('userdata' in results) self.assertEqual(results['userdata'], userdata) def test_generate_network_config(self): expected = { 'version': 1, 'config': [ { 'subnets': [ {'control': 'auto', 'dns_nameservers': ['8.8.8.8', '8.8.4.4'], 'netmask': '255.255.248.0', 'address': '62.181.8.174', 'type': 'static', 'gateway': '62.181.8.1'} ], 'type': 'physical', 'name': 'eth0', 'mac_address': '00:15:5d:ff:0f:03' }, { 'subnets': [ {'control': 'auto', 'dns_nameservers': ['9.9.9.9', '8.8.8.8'], 'netmask': '255.255.255.0', 'address': '10.209.78.11', 'type': 'static', 'gateway': '10.209.78.1'} ], 'type': 'physical', 'name': 'eth1', 'mac_address': '00:15:5d:ff:0f:24' } ] } self.assertTrue( ds.generate_network_config(CLOUD_METADATA['netadp']), expected ) @mock.patch(DS_PATH + '.subp.subp') def test_gratuitous_arp_run_standard_arping(self, m_subp): """Test handle run arping & parameters.""" items = [ { 'destination': '172.17.0.2', 'source': '172.16.6.104' }, { 'destination': '172.17.0.2', 'source': '172.16.6.104', }, ] ds.gratuitous_arp(items, self._fetch_distro('ubuntu')) self.assertEqual([ mock.call([ 'arping', '-c', '2', '-S', '172.16.6.104', '172.17.0.2' ]), mock.call([ 'arping', '-c', '2', '-S', '172.16.6.104', '172.17.0.2' ]) ], m_subp.call_args_list ) @mock.patch(DS_PATH + '.subp.subp') def test_handle_rhel_like_arping(self, m_subp): """Test handle on RHEL-like distros.""" items = [ { 'source': '172.16.6.104', 'destination': '172.17.0.2', } ] ds.gratuitous_arp(items, self._fetch_distro('fedora')) self.assertEqual([ mock.call( ['arping', '-c', '2', '-s', '172.16.6.104', '172.17.0.2'] )], m_subp.call_args_list ) @mock.patch( DS_PATH + '.subp.subp', side_effect=subp.ProcessExecutionError() ) def test_continue_on_arping_error(self, m_subp): """Continue when command error""" items = [ { 'destination': '172.17.0.2', 'source': '172.16.6.104' }, { 'destination': '172.17.0.2', 'source': '172.16.6.104', }, ] ds.gratuitous_arp(items, self._fetch_distro('ubuntu')) self.assertEqual([ mock.call([ 'arping', '-c', '2', '-S', '172.16.6.104', '172.17.0.2' ]), mock.call([ 'arping', '-c', '2', '-S', '172.16.6.104', '172.17.0.2' ]) ], m_subp.call_args_list ) def populate_cloud_metadata(path, data): populate_dir(path, {'cloud.json': json.dumps(data)}) def populate_user_metadata(path, data): populate_dir(path, {'user.data': data})