summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_azure_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_azure_helper.py')
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py71
1 files changed, 65 insertions, 6 deletions
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 26b2b93d..02556165 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -1,11 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+import unittest2
from textwrap import dedent
from cloudinit.sources.helpers import azure as azure_helper
from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
+from cloudinit.util import load_file
from cloudinit.sources.helpers.azure import WALinuxAgentShim as wa_shim
GOAL_STATE_TEMPLATE = """\
@@ -289,6 +291,50 @@ class TestOpenSSLManager(CiTestCase):
self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
+class TestOpenSSLManagerActions(CiTestCase):
+
+ def setUp(self):
+ super(TestOpenSSLManagerActions, self).setUp()
+
+ self.allowed_subp = True
+
+ def _data_file(self, name):
+ path = 'tests/data/azure'
+ return os.path.join(path, name)
+
+ @unittest2.skip("todo move to cloud_test")
+ def test_pubkey_extract(self):
+ cert = load_file(self._data_file('pubkey_extract_cert'))
+ good_key = load_file(self._data_file('pubkey_extract_ssh_key'))
+ sslmgr = azure_helper.OpenSSLManager()
+ key = sslmgr._get_ssh_key_from_cert(cert)
+ self.assertEqual(good_key, key)
+
+ good_fingerprint = '073E19D14D1C799224C6A0FD8DDAB6A8BF27D473'
+ fingerprint = sslmgr._get_fingerprint_from_cert(cert)
+ self.assertEqual(good_fingerprint, fingerprint)
+
+ @unittest2.skip("todo move to cloud_test")
+ @mock.patch.object(azure_helper.OpenSSLManager, '_decrypt_certs_from_xml')
+ def test_parse_certificates(self, mock_decrypt_certs):
+ """Azure control plane puts private keys as well as certificates
+ into the Certificates XML object. Make sure only the public keys
+ from certs are extracted and that fingerprints are converted to
+ the form specified in the ovf-env.xml file.
+ """
+ cert_contents = load_file(self._data_file('parse_certificates_pem'))
+ fingerprints = load_file(self._data_file(
+ 'parse_certificates_fingerprints')
+ ).splitlines()
+ mock_decrypt_certs.return_value = cert_contents
+ sslmgr = azure_helper.OpenSSLManager()
+ keys_by_fp = sslmgr.parse_certificates('')
+ for fp in keys_by_fp.keys():
+ self.assertIn(fp, fingerprints)
+ for fp in fingerprints:
+ self.assertIn(fp, keys_by_fp)
+
+
class TestWALinuxAgentShim(CiTestCase):
def setUp(self):
@@ -329,18 +375,31 @@ class TestWALinuxAgentShim(CiTestCase):
def test_certificates_used_to_determine_public_keys(self):
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ """if register_with_azure_and_fetch_data() isn't passed some info about
+ the user's public keys, there's no point in even trying to parse
+ the certificates
+ """
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'},
+ {'fingerprint': 'fp3', 'path': 'path3', 'value': ''}]
+ certs = {'fp1': 'expected-key',
+ 'fp2': 'should-not-be-found',
+ 'fp3': 'expected-no-value-key',
+ }
+ sslmgr = self.OpenSSLManager.return_value
+ sslmgr.parse_certificates.return_value = certs
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual(
[mock.call(self.GoalState.return_value.certificates_xml)],
- self.OpenSSLManager.return_value.parse_certificates.call_args_list)
- self.assertEqual(
- self.OpenSSLManager.return_value.parse_certificates.return_value,
- data['public-keys'])
+ sslmgr.parse_certificates.call_args_list)
+ self.assertIn('expected-key', data['public-keys'])
+ self.assertIn('expected-no-value-key', data['public-keys'])
+ self.assertNotIn('should-not-be-found', data['public-keys'])
def test_absent_certificates_produces_empty_public_keys(self):
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'}]
self.GoalState.return_value.certificates_xml = None
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual([], data['public-keys'])
def test_correct_url_used_for_report_ready(self):