summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_azure_helper.py
diff options
context:
space:
mode:
authorzsdc <taras@vyos.io>2020-03-11 21:20:58 +0200
committerzsdc <taras@vyos.io>2020-03-11 21:22:23 +0200
commitc6627bc05a57645e6af8b9a5a67e452d9f37e487 (patch)
treeb754b3991e5e57a9ae9155819f73fa0cbd4be269 /tests/unittests/test_datasource/test_azure_helper.py
parentca9a4eb26b41c204d1bd3a15586b14a5dde950bb (diff)
parent13e82554728b1cb524438163784e5b955c7c5ed0 (diff)
downloadvyos-cloud-init-c6627bc05a57645e6af8b9a5a67e452d9f37e487.tar.gz
vyos-cloud-init-c6627bc05a57645e6af8b9a5a67e452d9f37e487.zip
Cloud-init: T2117: Updated to 20.1
- Merge 20.1 version from the Canonical repository - Removed unneeded changes in datasources (now only OVF datasource is not equal to upstream's version) - Adapted cc_vyos module to new Cloud-init version - Changed Jenkinsfile to use build scripts, provided by upstream
Diffstat (limited to 'tests/unittests/test_datasource/test_azure_helper.py')
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py117
1 files changed, 103 insertions, 14 deletions
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 26b2b93d..007df09f 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -1,11 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+import unittest2
from textwrap import dedent
from cloudinit.sources.helpers import azure as azure_helper
from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
+from cloudinit.util import load_file
from cloudinit.sources.helpers.azure import WALinuxAgentShim as wa_shim
GOAL_STATE_TEMPLATE = """\
@@ -65,12 +67,17 @@ class TestFindEndpoint(CiTestCase):
self.networkd_leases.return_value = None
def test_missing_file(self):
- self.assertRaises(ValueError, wa_shim.find_endpoint)
+ """wa_shim find_endpoint uses default endpoint if leasefile not found
+ """
+ self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
def test_missing_special_azure_line(self):
+ """wa_shim find_endpoint uses default endpoint if leasefile is found
+ but does not contain DHCP Option 245 (whose value is the endpoint)
+ """
self.load_file.return_value = ''
self.dhcp_options.return_value = {'eth0': {'key': 'value'}}
- self.assertRaises(ValueError, wa_shim.find_endpoint)
+ self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
@staticmethod
def _build_lease_content(encoded_address):
@@ -163,6 +170,25 @@ class TestGoalStateParsing(CiTestCase):
goal_state = self._get_goal_state(instance_id=instance_id)
self.assertEqual(instance_id, goal_state.instance_id)
+ def test_instance_id_byte_swap(self):
+ """Return true when previous_iid is byteswapped current_iid"""
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8"
+ self.assertTrue(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
+ def test_instance_id_no_byte_swap_same_instance_id(self):
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ self.assertFalse(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
+ def test_instance_id_no_byte_swap_diff_instance_id(self):
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "G0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ self.assertFalse(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
def test_certificates_xml_parsed_and_fetched_correctly(self):
http_client = mock.MagicMock()
certificates_url = 'TestCertificatesUrl'
@@ -205,8 +231,10 @@ class TestAzureEndpointHttpClient(CiTestCase):
response = client.get(url, secure=False)
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=self.regular_headers),
- self.read_file_or_url.call_args)
+ self.assertEqual(
+ mock.call(url, headers=self.regular_headers, retries=10,
+ timeout=5),
+ self.read_file_or_url.call_args)
def test_secure_get(self):
url = 'MyTestUrl'
@@ -220,8 +248,10 @@ class TestAzureEndpointHttpClient(CiTestCase):
response = client.get(url, secure=True)
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=expected_headers),
- self.read_file_or_url.call_args)
+ self.assertEqual(
+ mock.call(url, headers=expected_headers, retries=10,
+ timeout=5),
+ self.read_file_or_url.call_args)
def test_post(self):
data = mock.MagicMock()
@@ -231,7 +261,8 @@ class TestAzureEndpointHttpClient(CiTestCase):
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
self.assertEqual(
- mock.call(url, data=data, headers=self.regular_headers),
+ mock.call(url, data=data, headers=self.regular_headers, retries=10,
+ timeout=5),
self.read_file_or_url.call_args)
def test_post_with_extra_headers(self):
@@ -243,7 +274,8 @@ class TestAzureEndpointHttpClient(CiTestCase):
expected_headers = self.regular_headers.copy()
expected_headers.update(extra_headers)
self.assertEqual(
- mock.call(mock.ANY, data=mock.ANY, headers=expected_headers),
+ mock.call(mock.ANY, data=mock.ANY, headers=expected_headers,
+ retries=10, timeout=5),
self.read_file_or_url.call_args)
@@ -289,6 +321,50 @@ class TestOpenSSLManager(CiTestCase):
self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
+class TestOpenSSLManagerActions(CiTestCase):
+
+ def setUp(self):
+ super(TestOpenSSLManagerActions, self).setUp()
+
+ self.allowed_subp = True
+
+ def _data_file(self, name):
+ path = 'tests/data/azure'
+ return os.path.join(path, name)
+
+ @unittest2.skip("todo move to cloud_test")
+ def test_pubkey_extract(self):
+ cert = load_file(self._data_file('pubkey_extract_cert'))
+ good_key = load_file(self._data_file('pubkey_extract_ssh_key'))
+ sslmgr = azure_helper.OpenSSLManager()
+ key = sslmgr._get_ssh_key_from_cert(cert)
+ self.assertEqual(good_key, key)
+
+ good_fingerprint = '073E19D14D1C799224C6A0FD8DDAB6A8BF27D473'
+ fingerprint = sslmgr._get_fingerprint_from_cert(cert)
+ self.assertEqual(good_fingerprint, fingerprint)
+
+ @unittest2.skip("todo move to cloud_test")
+ @mock.patch.object(azure_helper.OpenSSLManager, '_decrypt_certs_from_xml')
+ def test_parse_certificates(self, mock_decrypt_certs):
+ """Azure control plane puts private keys as well as certificates
+ into the Certificates XML object. Make sure only the public keys
+ from certs are extracted and that fingerprints are converted to
+ the form specified in the ovf-env.xml file.
+ """
+ cert_contents = load_file(self._data_file('parse_certificates_pem'))
+ fingerprints = load_file(self._data_file(
+ 'parse_certificates_fingerprints')
+ ).splitlines()
+ mock_decrypt_certs.return_value = cert_contents
+ sslmgr = azure_helper.OpenSSLManager()
+ keys_by_fp = sslmgr.parse_certificates('')
+ for fp in keys_by_fp.keys():
+ self.assertIn(fp, fingerprints)
+ for fp in fingerprints:
+ self.assertIn(fp, keys_by_fp)
+
+
class TestWALinuxAgentShim(CiTestCase):
def setUp(self):
@@ -329,18 +405,31 @@ class TestWALinuxAgentShim(CiTestCase):
def test_certificates_used_to_determine_public_keys(self):
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ """if register_with_azure_and_fetch_data() isn't passed some info about
+ the user's public keys, there's no point in even trying to parse
+ the certificates
+ """
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'},
+ {'fingerprint': 'fp3', 'path': 'path3', 'value': ''}]
+ certs = {'fp1': 'expected-key',
+ 'fp2': 'should-not-be-found',
+ 'fp3': 'expected-no-value-key',
+ }
+ sslmgr = self.OpenSSLManager.return_value
+ sslmgr.parse_certificates.return_value = certs
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual(
[mock.call(self.GoalState.return_value.certificates_xml)],
- self.OpenSSLManager.return_value.parse_certificates.call_args_list)
- self.assertEqual(
- self.OpenSSLManager.return_value.parse_certificates.return_value,
- data['public-keys'])
+ sslmgr.parse_certificates.call_args_list)
+ self.assertIn('expected-key', data['public-keys'])
+ self.assertIn('expected-no-value-key', data['public-keys'])
+ self.assertNotIn('should-not-be-found', data['public-keys'])
def test_absent_certificates_produces_empty_public_keys(self):
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'}]
self.GoalState.return_value.certificates_xml = None
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual([], data['public-keys'])
def test_correct_url_used_for_report_ready(self):