summaryrefslogtreecommitdiff
path: root/tests/common
diff options
context:
space:
mode:
Diffstat (limited to 'tests/common')
-rw-r--r--tests/common/osutil/test_bigip.py352
-rw-r--r--tests/common/osutil/test_default.py22
-rw-r--r--tests/common/test_version.py109
3 files changed, 483 insertions, 0 deletions
diff --git a/tests/common/osutil/test_bigip.py b/tests/common/osutil/test_bigip.py
new file mode 100644
index 0000000..4d1b006
--- /dev/null
+++ b/tests/common/osutil/test_bigip.py
@@ -0,0 +1,352 @@
+# Copyright 2016 F5 Networks Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+
+import os
+import socket
+import time
+
+import azurelinuxagent.common.osutil.bigip as osutil
+import azurelinuxagent.common.osutil.default as default
+import azurelinuxagent.common.utils.shellutil as shellutil
+
+from azurelinuxagent.common.exception import OSUtilError
+from tests.tools import *
+
+
+class TestBigIpOSUtil_wait_until_mcpd_is_initialized(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ @patch.object(logger, "info", return_value=None)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil._wait_until_mcpd_is_initialized(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, True)
+
+ # There are two logger calls in the mcpd wait function. The second
+ # occurs after mcpd is found to be "up"
+ self.assertEqual(args[0].call_count, 2)
+
+ @patch.object(shellutil, "run", return_value=1)
+ @patch.object(logger, "info", return_value=None)
+ @patch.object(time, "sleep", return_value=None)
+ def test_failure(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil._wait_until_mcpd_is_initialized,
+ osutil.BigIpOSUtil()
+ )
+
+
+class TestBigIpOSUtil_save_sys_config(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ @patch.object(logger, "error", return_value=None)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil._save_sys_config(osutil.BigIpOSUtil())
+ self.assertEqual(result, 0)
+ self.assertEqual(args[0].call_count, 0)
+
+ @patch.object(shellutil, "run", return_value=1)
+ @patch.object(logger, "error", return_value=None)
+ def test_failure(self, *args):
+ result = osutil.BigIpOSUtil._save_sys_config(osutil.BigIpOSUtil())
+ self.assertEqual(result, 1)
+ self.assertEqual(args[0].call_count, 1)
+
+
+class TestBigIpOSUtil_get_dhcp_pid(AgentTestCase):
+
+ @patch.object(shellutil, "run_get_output", return_value=(0, 8623))
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.get_dhcp_pid(osutil.BigIpOSUtil())
+ self.assertEqual(result, 8623)
+
+ @patch.object(shellutil, "run_get_output", return_value=(1, 'foo'))
+ def test_failure(self, *args):
+ result = osutil.BigIpOSUtil.get_dhcp_pid(osutil.BigIpOSUtil())
+ self.assertEqual(result, None)
+
+
+class TestBigIpOSUtil_useradd(AgentTestCase):
+
+ @patch.object(osutil.BigIpOSUtil, 'get_userentry', return_value=None)
+ @patch.object(shellutil, "run_get_output")
+ def test_success(self, *args):
+ args[0].return_value = (0, None)
+ result = osutil.BigIpOSUtil.useradd(
+ osutil.BigIpOSUtil(), 'foo', expiration=None
+ )
+ self.assertEqual(result, 0)
+
+ @patch.object(osutil.BigIpOSUtil, 'get_userentry', return_value=None)
+ def test_user_already_exists(self, *args):
+ args[0].return_value = 'admin'
+ result = osutil.BigIpOSUtil.useradd(
+ osutil.BigIpOSUtil(), 'admin', expiration=None
+ )
+ self.assertEqual(result, None)
+
+ @patch.object(shellutil, "run", return_value=1)
+ def test_failure(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil.useradd,
+ osutil.BigIpOSUtil(), 'foo', expiration=None
+ )
+
+
+class TestBigIpOSUtil_chpasswd(AgentTestCase):
+
+ @patch.object(shellutil, "run_get_output", return_value=(0, None))
+ @patch.object(osutil.BigIpOSUtil, 'get_userentry', return_value=True)
+ @patch.object(osutil.BigIpOSUtil, 'is_sys_user', return_value=False)
+ @patch.object(osutil.BigIpOSUtil, '_save_sys_config', return_value=None)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.chpasswd(
+ osutil.BigIpOSUtil(), 'admin', 'password', crypt_id=6, salt_len=10
+ )
+ self.assertEqual(result, 0)
+ self.assertEqual(args[0].call_count, 1)
+ self.assertEqual(args[0].call_count, 1)
+
+ @patch.object(osutil.BigIpOSUtil, 'is_sys_user', return_value=True)
+ def test_is_sys_user(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil.chpasswd,
+ osutil.BigIpOSUtil(), 'admin', 'password', crypt_id=6, salt_len=10
+ )
+
+ @patch.object(shellutil, "run_get_output", return_value=(1, None))
+ @patch.object(osutil.BigIpOSUtil, 'is_sys_user', return_value=False)
+ def test_failed_to_set_user_password(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil.chpasswd,
+ osutil.BigIpOSUtil(), 'admin', 'password', crypt_id=6, salt_len=10
+ )
+
+ @patch.object(shellutil, "run_get_output", return_value=(0, None))
+ @patch.object(osutil.BigIpOSUtil, 'is_sys_user', return_value=False)
+ @patch.object(osutil.BigIpOSUtil, 'get_userentry', return_value=None)
+ def test_failed_to_get_user_entry(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil.chpasswd,
+ osutil.BigIpOSUtil(), 'admin', 'password', crypt_id=6, salt_len=10
+ )
+
+
+class TestBigIpOSUtil_get_dvd_device(AgentTestCase):
+
+ @patch.object(os, "listdir", return_value=['tty1','cdrom0'])
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.get_dvd_device(
+ osutil.BigIpOSUtil(), '/dev'
+ )
+ self.assertEqual(result, '/dev/cdrom0')
+
+ @patch.object(os, "listdir", return_value=['foo', 'bar'])
+ def test_failure(self, *args):
+ self.assertRaises(
+ OSUtilError,
+ osutil.BigIpOSUtil.get_dvd_device,
+ osutil.BigIpOSUtil(), '/dev'
+ )
+
+
+class TestBigIpOSUtil_restart_ssh_service(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.restart_ssh_service(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, 0)
+
+
+class TestBigIpOSUtil_stop_agent_service(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.stop_agent_service(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, 0)
+
+
+class TestBigIpOSUtil_start_agent_service(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.start_agent_service(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, 0)
+
+
+class TestBigIpOSUtil_register_agent_service(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.register_agent_service(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, 0)
+
+
+class TestBigIpOSUtil_unregister_agent_service(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.unregister_agent_service(
+ osutil.BigIpOSUtil()
+ )
+ self.assertEqual(result, 0)
+
+
+class TestBigIpOSUtil_set_hostname(AgentTestCase):
+
+ @patch.object(os.path, "exists", return_value=False)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.set_hostname(
+ osutil.BigIpOSUtil(), None
+ )
+ self.assertEqual(args[0].call_count, 0)
+ self.assertEqual(result, None)
+
+
+class TestBigIpOSUtil_set_dhcp_hostname(AgentTestCase):
+
+ @patch.object(os.path, "exists", return_value=False)
+ def test_success(self, *args):
+ result = osutil.BigIpOSUtil.set_dhcp_hostname(
+ osutil.BigIpOSUtil(), None
+ )
+ self.assertEqual(args[0].call_count, 0)
+ self.assertEqual(result, None)
+
+
+class TestBigIpOSUtil_get_first_if(AgentTestCase):
+
+ @patch.object(osutil.BigIpOSUtil,
+ '_format_single_interface_name', return_value=b'eth0')
+ def test_success(self, *args):
+ ifname, ipaddr = osutil.BigIpOSUtil().get_first_if()
+ self.assertTrue(ifname.startswith('eth'))
+ self.assertTrue(ipaddr is not None)
+ try:
+ socket.inet_aton(ipaddr)
+ except socket.error:
+ self.fail("not a valid ip address")
+
+ @patch.object(osutil.BigIpOSUtil,
+ '_format_single_interface_name', return_value=b'loenp0s3')
+ def test_success(self, *args):
+ ifname, ipaddr = osutil.BigIpOSUtil().get_first_if()
+ self.assertFalse(ifname.startswith('eth'))
+ self.assertTrue(ipaddr is not None)
+ try:
+ socket.inet_aton(ipaddr)
+ except socket.error:
+ self.fail("not a valid ip address")
+
+
+class TestBigIpOSUtil_mount_dvd(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ @patch.object(time, "sleep", return_value=None)
+ @patch.object(osutil.BigIpOSUtil,
+ '_wait_until_mcpd_is_initialized', return_value=None)
+ @patch.object(default.DefaultOSUtil, 'mount_dvd', return_value=None)
+ def test_success(self, *args):
+ osutil.BigIpOSUtil.mount_dvd(
+ osutil.BigIpOSUtil(), max_retry=6, chk_err=True
+ )
+ self.assertEqual(args[0].call_count, 1)
+ self.assertEqual(args[1].call_count, 1)
+
+
+class TestBigIpOSUtil_set_admin_access_to_ip(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ @patch.object(osutil.BigIpOSUtil,
+ '_set_accept_admin_access_to_ip', return_value=None)
+ @patch.object(osutil.BigIpOSUtil,
+ '_set_drop_admin_access_to_ip', return_value=None)
+ def test_success(self, *args):
+ osutil.BigIpOSUtil.set_admin_access_to_ip(
+ osutil.BigIpOSUtil(), '192.168.10.10'
+ )
+ self.assertEqual(args[0].call_count, 1)
+ self.assertEqual(args[1].call_count, 1)
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_accept_access(self, *args):
+ osutil.BigIpOSUtil._set_accept_admin_access_to_ip(
+ osutil.BigIpOSUtil(), '192.168.10.10'
+ )
+ self.assertEqual(args[0].call_count, 2)
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_drop_access(self, *args):
+ osutil.BigIpOSUtil._set_drop_admin_access_to_ip(
+ osutil.BigIpOSUtil(), '192.168.10.10'
+ )
+ self.assertEqual(args[0].call_count, 2)
+
+
+class TestBigIpOSUtil_route_add(AgentTestCase):
+
+ @patch.object(shellutil, "run", return_value=0)
+ def test_success(self, *args):
+ osutil.BigIpOSUtil.route_add(
+ osutil.BigIpOSUtil(), '10.10.10.0', '255.255.255.0', '10.10.10.1'
+ )
+ self.assertEqual(args[0].call_count, 1)
+
+
+class TestBigIpOSUtil_device_for_ide_port(AgentTestCase):
+
+ @patch.object(time, "sleep", return_value=None)
+ @patch.object(os.path, "exists", return_value=False)
+ @patch.object(default.DefaultOSUtil,
+ 'device_for_ide_port', return_value=None)
+ def test_success_waiting(self, *args):
+ osutil.BigIpOSUtil.device_for_ide_port(
+ osutil.BigIpOSUtil(), '5'
+ )
+ self.assertEqual(args[0].call_count, 1)
+ self.assertEqual(args[1].call_count, 99)
+ self.assertEqual(args[2].call_count, 99)
+
+ @patch.object(time, "sleep", return_value=None)
+ @patch.object(os.path, "exists", return_value=True)
+ @patch.object(default.DefaultOSUtil,
+ 'device_for_ide_port', return_value=None)
+ def test_success_immediate(self, *args):
+ osutil.BigIpOSUtil.device_for_ide_port(
+ osutil.BigIpOSUtil(), '5'
+ )
+ self.assertEqual(args[0].call_count, 1)
+ self.assertEqual(args[1].call_count, 1)
+ self.assertEqual(args[2].call_count, 0)
+
+
+if __name__ == '__main__':
+ unittest.main() \ No newline at end of file
diff --git a/tests/common/osutil/test_default.py b/tests/common/osutil/test_default.py
index d9d00f6..d982b7e 100644
--- a/tests/common/osutil/test_default.py
+++ b/tests/common/osutil/test_default.py
@@ -142,5 +142,27 @@ class TestOSUtil(AgentTestCase):
self.assertTrue(endpoint is not None)
self.assertEqual(endpoint, "second")
+ def test_get_total_mem(self):
+ """
+ Validate the returned value matches to the one retrieved by invoking shell command
+ """
+ cmd = "grep MemTotal /proc/meminfo |awk '{print $2}'"
+ ret = shellutil.run_get_output(cmd)
+ if ret[0] == 0:
+ self.assertEqual(int(ret[1]) / 1024, get_osutil().get_total_mem())
+ else:
+ self.fail("Cannot retrieve total memory using shell command.")
+
+ def test_get_processor_cores(self):
+ """
+ Validate the returned value matches to the one retrieved by invoking shell command
+ """
+ cmd = "grep 'processor.*:' /proc/cpuinfo |wc -l"
+ ret = shellutil.run_get_output(cmd)
+ if ret[0] == 0:
+ self.assertEqual(int(ret[1]), get_osutil().get_processor_cores())
+ else:
+ self.fail("Cannot retrieve number of process cores using shell command.")
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/common/test_version.py b/tests/common/test_version.py
index 6a4dc38..306ea16 100644
--- a/tests/common/test_version.py
+++ b/tests/common/test_version.py
@@ -20,12 +20,14 @@ from __future__ import print_function
import copy
import glob
import json
+import mock
import os
import platform
import random
import subprocess
import sys
import tempfile
+import textwrap
import zipfile
from tests.protocol.mockwiredata import *
@@ -34,6 +36,7 @@ from tests.tools import *
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.fileutil as fileutil
+import azurelinuxagent.common.version as version
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
from azurelinuxagent.common.version import *
@@ -63,3 +66,109 @@ class TestCurrentAgentName(AgentTestCase):
self.assertEqual(agent, current_agent)
self.assertEqual(version, str(current_version))
return
+
+class TestGetF5Platforms(AgentTestCase):
+ def test_get_f5_platform_bigip_12_1_1(self):
+ version_file = textwrap.dedent("""
+ Product: BIG-IP
+ Version: 12.1.1
+ Build: 0.0.184
+ Sequence: 12.1.1.0.0.184.0
+ BaseBuild: 0.0.184
+ Edition: Final
+ Date: Thu Aug 11 17:09:01 PDT 2016
+ Built: 160811170901
+ Changelist: 1874858
+ JobID: 705993""")
+
+ mo = mock.mock_open(read_data=version_file)
+ with patch(open_patch(), mo):
+ platform = version.get_f5_platform()
+ self.assertTrue(platform[0] == 'bigip')
+ self.assertTrue(platform[1] == '12.1.1')
+ self.assertTrue(platform[2] == 'bigip')
+ self.assertTrue(platform[3] == 'BIG-IP')
+
+ def test_get_f5_platform_bigip_12_1_0_hf1(self):
+ version_file = textwrap.dedent("""
+ Product: BIG-IP
+ Version: 12.1.0
+ Build: 1.0.1447
+ Sequence: 12.1.0.1.0.1447.0
+ BaseBuild: 0.0.1434
+ Edition: Hotfix HF1
+ Date: Wed Jun 8 13:41:59 PDT 2016
+ Built: 160608134159
+ Changelist: 1773831
+ JobID: 673467""")
+
+ mo = mock.mock_open(read_data=version_file)
+ with patch(open_patch(), mo):
+ platform = version.get_f5_platform()
+ self.assertTrue(platform[0] == 'bigip')
+ self.assertTrue(platform[1] == '12.1.0')
+ self.assertTrue(platform[2] == 'bigip')
+ self.assertTrue(platform[3] == 'BIG-IP')
+
+ def test_get_f5_platform_bigip_12_0_0(self):
+ version_file = textwrap.dedent("""
+ Product: BIG-IP
+ Version: 12.0.0
+ Build: 0.0.606
+ Sequence: 12.0.0.0.0.606.0
+ BaseBuild: 0.0.606
+ Edition: Final
+ Date: Fri Aug 21 13:29:22 PDT 2015
+ Built: 150821132922
+ Changelist: 1486072
+ JobID: 536212""")
+
+ mo = mock.mock_open(read_data=version_file)
+ with patch(open_patch(), mo):
+ platform = version.get_f5_platform()
+ self.assertTrue(platform[0] == 'bigip')
+ self.assertTrue(platform[1] == '12.0.0')
+ self.assertTrue(platform[2] == 'bigip')
+ self.assertTrue(platform[3] == 'BIG-IP')
+
+ def test_get_f5_platform_iworkflow_2_0_1(self):
+ version_file = textwrap.dedent("""
+ Product: iWorkflow
+ Version: 2.0.1
+ Build: 0.0.9842
+ Sequence: 2.0.1.0.0.9842.0
+ BaseBuild: 0.0.9842
+ Edition: Final
+ Date: Sat Oct 1 22:52:08 PDT 2016
+ Built: 161001225208
+ Changelist: 1924048
+ JobID: 734712""")
+
+ mo = mock.mock_open(read_data=version_file)
+ with patch(open_patch(), mo):
+ platform = version.get_f5_platform()
+ self.assertTrue(platform[0] == 'iworkflow')
+ self.assertTrue(platform[1] == '2.0.1')
+ self.assertTrue(platform[2] == 'iworkflow')
+ self.assertTrue(platform[3] == 'iWorkflow')
+
+ def test_get_f5_platform_bigiq_5_1_0(self):
+ version_file = textwrap.dedent("""
+ Product: BIG-IQ
+ Version: 5.1.0
+ Build: 0.0.631
+ Sequence: 5.1.0.0.0.631.0
+ BaseBuild: 0.0.631
+ Edition: Final
+ Date: Thu Sep 15 19:55:43 PDT 2016
+ Built: 160915195543
+ Changelist: 1907534
+ JobID: 726344""")
+
+ mo = mock.mock_open(read_data=version_file)
+ with patch(open_patch(), mo):
+ platform = version.get_f5_platform()
+ self.assertTrue(platform[0] == 'bigiq')
+ self.assertTrue(platform[1] == '5.1.0')
+ self.assertTrue(platform[2] == 'bigiq')
+ self.assertTrue(platform[3] == 'BIG-IQ')