diff options
Diffstat (limited to 'tests/unittests/sources/test_rbx.py')
-rw-r--r-- | tests/unittests/sources/test_rbx.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_rbx.py b/tests/unittests/sources/test_rbx.py new file mode 100644 index 00000000..475bf498 --- /dev/null +++ b/tests/unittests/sources/test_rbx.py @@ -0,0 +1,241 @@ +import json + +from cloudinit import distros, helpers, subp +from cloudinit.sources import DataSourceRbxCloud as ds +from tests.unittests.helpers import CiTestCase, mock, populate_dir + +DS_PATH = "cloudinit.sources.DataSourceRbxCloud" + +CRYPTO_PASS = ( + "$6$uktth46t$FvpDzFD2iL9YNZIG1Epz7957hJqbH0f" + "QKhnzcfBcUhEodGAWRqTy7tYG4nEW7SUOYBjxOSFIQW5" + "tToyGP41.s1" +) + +CLOUD_METADATA = { + "vm": { + "memory": 4, + "cpu": 2, + "name": "vm-image-builder", + "_id": "5beab44f680cffd11f0e60fc", + }, + "additionalMetadata": { + "username": "guru", + "sshKeys": ["ssh-rsa ..."], + "password": {"sha512": CRYPTO_PASS}, + }, + "disk": [ + { + "size": 10, + "type": "ssd", + "name": "vm-image-builder-os", + "_id": "5beab450680cffd11f0e60fe", + }, + { + "size": 2, + "type": "ssd", + "name": "ubuntu-1804-bionic", + "_id": "5bef002c680cffd11f107590", + }, + ], + "netadp": [ + { + "ip": [{"address": "62.181.8.174"}], + "network": { + "dns": {"nameservers": ["8.8.8.8", "8.8.4.4"]}, + "routing": [], + "gateway": "62.181.8.1", + "netmask": "255.255.248.0", + "name": "public", + "type": "public", + "_id": "5784e97be2627505227b578c", + }, + "speed": 1000, + "type": "hv", + "macaddress": "00:15:5D:FF:0F:03", + "_id": "5beab450680cffd11f0e6102", + }, + { + "ip": [{"address": "10.209.78.11"}], + "network": { + "dns": {"nameservers": ["9.9.9.9", "8.8.8.8"]}, + "routing": [], + "gateway": "10.209.78.1", + "netmask": "255.255.255.0", + "name": "network-determined-bardeen", + "type": "private", + "_id": "5beaec64680cffd11f0e7c31", + }, + "speed": 1000, + "type": "hv", + "macaddress": "00:15:5D:FF:0F:24", + "_id": "5bec18c6680cffd11f0f0d8b", + }, + ], + "dvddrive": [{"iso": {}}], +} + + +class TestRbxDataSource(CiTestCase): + parsed_user = None + allowed_subp = ["bash"] + + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) + + def setUp(self): + super(TestRbxDataSource, self).setUp() + self.tmp = self.tmp_dir() + self.paths = helpers.Paths( + {"cloud_dir": self.tmp, "run_dir": self.tmp} + ) + + # defaults for few tests + self.ds = ds.DataSourceRbxCloud + self.seed_dir = self.paths.seed_dir + self.sys_cfg = {"datasource": {"RbxCloud": {"dsmode": "local"}}} + + def test_seed_read_user_data_callback_empty_file(self): + populate_user_metadata(self.seed_dir, "") + populate_cloud_metadata(self.seed_dir, {}) + results = ds.read_user_data_callback(self.seed_dir) + + self.assertIsNone(results) + + def test_seed_read_user_data_callback_valid_disk(self): + populate_user_metadata(self.seed_dir, "") + populate_cloud_metadata(self.seed_dir, CLOUD_METADATA) + results = ds.read_user_data_callback(self.seed_dir) + + self.assertNotEqual(results, None) + self.assertTrue("userdata" in results) + self.assertTrue("metadata" in results) + self.assertTrue("cfg" in results) + + def test_seed_read_user_data_callback_userdata(self): + userdata = "#!/bin/sh\nexit 1" + populate_user_metadata(self.seed_dir, userdata) + populate_cloud_metadata(self.seed_dir, CLOUD_METADATA) + + results = ds.read_user_data_callback(self.seed_dir) + + self.assertNotEqual(results, None) + self.assertTrue("userdata" in results) + self.assertEqual(results["userdata"], userdata) + + def test_generate_network_config(self): + expected = { + "version": 1, + "config": [ + { + "subnets": [ + { + "control": "auto", + "dns_nameservers": ["8.8.8.8", "8.8.4.4"], + "netmask": "255.255.248.0", + "address": "62.181.8.174", + "type": "static", + "gateway": "62.181.8.1", + } + ], + "type": "physical", + "name": "eth0", + "mac_address": "00:15:5d:ff:0f:03", + }, + { + "subnets": [ + { + "control": "auto", + "dns_nameservers": ["9.9.9.9", "8.8.8.8"], + "netmask": "255.255.255.0", + "address": "10.209.78.11", + "type": "static", + "gateway": "10.209.78.1", + } + ], + "type": "physical", + "name": "eth1", + "mac_address": "00:15:5d:ff:0f:24", + }, + ], + } + self.assertTrue( + ds.generate_network_config(CLOUD_METADATA["netadp"]), expected + ) + + @mock.patch(DS_PATH + ".subp.subp") + def test_gratuitous_arp_run_standard_arping(self, m_subp): + """Test handle run arping & parameters.""" + items = [ + {"destination": "172.17.0.2", "source": "172.16.6.104"}, + { + "destination": "172.17.0.2", + "source": "172.16.6.104", + }, + ] + ds.gratuitous_arp(items, self._fetch_distro("ubuntu")) + self.assertEqual( + [ + mock.call( + ["arping", "-c", "2", "-S", "172.16.6.104", "172.17.0.2"] + ), + mock.call( + ["arping", "-c", "2", "-S", "172.16.6.104", "172.17.0.2"] + ), + ], + m_subp.call_args_list, + ) + + @mock.patch(DS_PATH + ".subp.subp") + def test_handle_rhel_like_arping(self, m_subp): + """Test handle on RHEL-like distros.""" + items = [ + { + "source": "172.16.6.104", + "destination": "172.17.0.2", + } + ] + ds.gratuitous_arp(items, self._fetch_distro("fedora")) + self.assertEqual( + [ + mock.call( + ["arping", "-c", "2", "-s", "172.16.6.104", "172.17.0.2"] + ) + ], + m_subp.call_args_list, + ) + + @mock.patch( + DS_PATH + ".subp.subp", side_effect=subp.ProcessExecutionError() + ) + def test_continue_on_arping_error(self, m_subp): + """Continue when command error""" + items = [ + {"destination": "172.17.0.2", "source": "172.16.6.104"}, + { + "destination": "172.17.0.2", + "source": "172.16.6.104", + }, + ] + ds.gratuitous_arp(items, self._fetch_distro("ubuntu")) + self.assertEqual( + [ + mock.call( + ["arping", "-c", "2", "-S", "172.16.6.104", "172.17.0.2"] + ), + mock.call( + ["arping", "-c", "2", "-S", "172.16.6.104", "172.17.0.2"] + ), + ], + m_subp.call_args_list, + ) + + +def populate_cloud_metadata(path, data): + populate_dir(path, {"cloud.json": json.dumps(data)}) + + +def populate_user_metadata(path, data): + populate_dir(path, {"user.data": data}) |