diff options
| author | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
|---|---|---|
| committer | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
| commit | a950059053f7394acfb453cc0d8194aa3dc721fa (patch) | |
| tree | eb0acf278f649b5d1417e18e34d728efcd16e745 /smoketest/scripts/cli/test_service_webproxy.py | |
| parent | f0815f3e9b212f424f5adb0c572a71119ad4a8a0 (diff) | |
| download | vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.tar.gz vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.zip | |
T6732: added same as vyos 1x
Diffstat (limited to 'smoketest/scripts/cli/test_service_webproxy.py')
| -rw-r--r-- | smoketest/scripts/cli/test_service_webproxy.py | 302 |
1 files changed, 302 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_service_webproxy.py b/smoketest/scripts/cli/test_service_webproxy.py new file mode 100644 index 0000000..2b3f6d2 --- /dev/null +++ b/smoketest/scripts/cli/test_service_webproxy.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020-2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSessionError +from vyos.utils.process import cmd +from vyos.utils.process import process_named_running +from vyos.utils.file import read_file + +PROCESS_NAME = 'squid' +PROXY_CONF = '/etc/squid/squid.conf' +base_path = ['service', 'webproxy'] +listen_if = 'dum3632' +listen_ip = '192.0.2.1' + +class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + # call base-classes classmethod + super(TestServiceWebProxy, cls).setUpClass() + # create a test interfaces + cls.cli_set(cls, ['interfaces', 'dummy', listen_if, 'address', listen_ip + '/32']) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, ['interfaces', 'dummy', listen_if]) + super(TestServiceWebProxy, cls).tearDownClass() + + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + + def test_01_basic_proxy(self): + default_cache = '100' + self.cli_set(base_path + ['listen-address', listen_ip]) + + # commit changes + self.cli_commit() + + config = read_file(PROXY_CONF) + self.assertIn(f'http_port {listen_ip}:3128 intercept', config) + self.assertIn(f'cache_dir ufs /var/spool/squid {default_cache} 16 256', config) + self.assertIn(f'access_log /var/log/squid/access.log squid', config) + + # ACL verification + self.assertIn(f'acl net src all', config) + self.assertIn(f'acl SSL_ports port 443', config) + + safe_ports = ['80', '21', '443', '873', '70', '210', '1025-65535', '280', + '488', '591', '777'] + for port in safe_ports: + self.assertIn(f'acl Safe_ports port {port}', config) + self.assertIn(f'acl CONNECT method CONNECT', config) + + self.assertIn(f'http_access allow manager localhost', config) + self.assertIn(f'http_access deny manager', config) + self.assertIn(f'http_access deny !Safe_ports', config) + self.assertIn(f'http_access deny CONNECT !SSL_ports', config) + self.assertIn(f'http_access allow localhost', config) + self.assertIn(f'http_access allow net', config) + self.assertIn(f'http_access deny all', config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_02_advanced_proxy(self): + domain = '.vyos.io' + cache_size = '900' + port = '8080' + min_obj_size = '128' + max_obj_size = '8192' + block_mine = ['application/pdf', 'application/x-sh'] + body_max_size = '4096' + safe_port = '88' + ssl_safe_port = '8443' + + self.cli_set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['append-domain', domain]) + self.cli_set(base_path + ['default-port', port]) + self.cli_set(base_path + ['cache-size', cache_size]) + self.cli_set(base_path + ['disable-access-log']) + + self.cli_set(base_path + ['minimum-object-size', min_obj_size]) + self.cli_set(base_path + ['maximum-object-size', max_obj_size]) + + self.cli_set(base_path + ['outgoing-address', listen_ip]) + + for mime in block_mine: + self.cli_set(base_path + ['reply-block-mime', mime]) + + self.cli_set(base_path + ['reply-body-max-size', body_max_size]) + + self.cli_set(base_path + ['safe-ports', safe_port]) + self.cli_set(base_path + ['ssl-safe-ports', ssl_safe_port]) + + # commit changes + self.cli_commit() + + config = read_file(PROXY_CONF) + self.assertIn(f'http_port {listen_ip}:{port} intercept', config) + self.assertIn(f'append_domain {domain}', config) + self.assertIn(f'cache_dir ufs /var/spool/squid {cache_size} 16 256', config) + self.assertIn(f'access_log none', config) + self.assertIn(f'minimum_object_size {min_obj_size} KB', config) + self.assertIn(f'maximum_object_size {max_obj_size} KB', config) + self.assertIn(f'tcp_outgoing_address {listen_ip}', config) + + for mime in block_mine: + self.assertIn(f'acl BLOCK_MIME rep_mime_type {mime}', config) + self.assertIn(f'http_reply_access deny BLOCK_MIME', config) + + self.assertIn(f'reply_body_max_size {body_max_size} KB', config) + + self.assertIn(f'acl Safe_ports port {safe_port}', config) + self.assertIn(f'acl SSL_ports port {ssl_safe_port}', config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_03_ldap_proxy_auth(self): + auth_children = '20' + cred_ttl = '120' + realm = 'VyOS Webproxy' + ldap_base_dn = 'DC=vyos,DC=net' + ldap_server = 'ldap.vyos.net' + ldap_bind_dn = f'CN=proxyuser,CN=Users,{ldap_base_dn}' + ldap_password = 'VyOS12345' + ldap_attr = 'cn' + ldap_filter = '(cn=%s)' + + self.cli_set(base_path + ['listen-address', listen_ip, 'disable-transparent']) + self.cli_set(base_path + ['authentication', 'children', auth_children]) + self.cli_set(base_path + ['authentication', 'credentials-ttl', cred_ttl]) + + self.cli_set(base_path + ['authentication', 'realm', realm]) + self.cli_set(base_path + ['authentication', 'method', 'ldap']) + # check validate() - LDAP authentication is enabled, but server not set + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'server', ldap_server]) + + # check validate() - LDAP password can not be set when bind-dn is not define + self.cli_set(base_path + ['authentication', 'ldap', 'password', ldap_password]) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'bind-dn', ldap_bind_dn]) + + # check validate() - LDAP base-dn must be set + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'base-dn', ldap_base_dn]) + + self.cli_set(base_path + ['authentication', 'ldap', 'username-attribute', ldap_attr]) + self.cli_set(base_path + ['authentication', 'ldap', 'filter-expression', ldap_filter]) + self.cli_set(base_path + ['authentication', 'ldap', 'use-ssl']) + + # commit changes + self.cli_commit() + + config = read_file(PROXY_CONF) + self.assertIn(f'http_port {listen_ip}:3128', config) # disable-transparent + + # Now verify LDAP settings + self.assertIn(f'auth_param basic children {auth_children}', config) + self.assertIn(f'auth_param basic credentialsttl {cred_ttl} minute', config) + self.assertIn(f'auth_param basic realm "{realm}"', config) + self.assertIn(f'auth_param basic program /usr/lib/squid/basic_ldap_auth -v 3 -b "{ldap_base_dn}" -D "{ldap_bind_dn}" -w "{ldap_password}" -f "{ldap_filter}" -u "{ldap_attr}" -p 389 -ZZ -R -h "{ldap_server}"', config) + self.assertIn(f'acl auth proxy_auth REQUIRED', config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_04_cache_peer(self): + self.cli_set(base_path + ['listen-address', listen_ip]) + + cache_peers = { + 'foo' : '192.0.2.1', + 'bar' : '192.0.2.2', + 'baz' : '192.0.2.3', + } + for peer in cache_peers: + self.cli_set(base_path + ['cache-peer', peer, 'address', cache_peers[peer]]) + if peer == 'baz': + self.cli_set(base_path + ['cache-peer', peer, 'type', 'sibling']) + + # commit changes + self.cli_commit() + + config = read_file(PROXY_CONF) + self.assertIn('never_direct allow all', config) + + for peer in cache_peers: + address = cache_peers[peer] + if peer == 'baz': + self.assertIn(f'cache_peer {address} sibling 3128 0 no-query default', config) + else: + self.assertIn(f'cache_peer {address} parent 3128 0 no-query default', config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_05_basic_squidguard(self): + # Create very basic local SquidGuard blacklist and verify its contents + sg_db_dir = '/opt/vyatta/etc/config/url-filtering/squidguard/db' + + default_cache = '100' + local_block = ['192.0.0.1', '10.0.0.1', 'block.vyos.net'] + local_block_url = ['foo.com/bar.html', 'bar.com/foo.htm'] + local_block_pattern = ['porn', 'cisco', 'juniper'] + local_ok = ['10.0.0.0', 'vyos.net'] + local_ok_url = ['vyos.net', 'vyos.io'] + + self.cli_set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'log', 'all']) + + for block in local_block: + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block', block]) + for ok in local_ok: + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok', ok]) + for url in local_block_url: + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-url', url]) + for url in local_ok_url: + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok-url', url]) + for pattern in local_block_pattern: + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-keyword', pattern]) + + # commit changes + self.cli_commit() + + # Check regular Squid config + config = read_file(PROXY_CONF) + self.assertIn(f'http_port {listen_ip}:3128 intercept', config) + + self.assertIn(f'url_rewrite_program /usr/bin/squidGuard -c /etc/squidguard/squidGuard.conf', config) + self.assertIn(f'url_rewrite_children 8', config) + + # Check SquidGuard config + sg_config = read_file('/etc/squidguard/squidGuard.conf') + self.assertIn(f'log blacklist.log', sg_config) + + # The following are rewrite strings to force safe/strict search for + # several popular search engines. + self.assertIn(r's@(.*\.google\..*/(custom|search|images|groups|news)?.*q=.*)@\1\&safe=active@i', sg_config) + self.assertIn(r's@(.*\..*/yandsearch?.*text=.*)@\1\&fyandex=1@i', sg_config) + self.assertIn(r's@(.*\.yahoo\..*/search.*p=.*)@\1\&vm=r@i', sg_config) + self.assertIn(r's@(.*\.live\..*/.*q=.*)@\1\&adlt=strict@i', sg_config) + self.assertIn(r's@(.*\.msn\..*/.*q=.*)@\1\&adlt=strict@i', sg_config) + self.assertIn(r's@(.*\.bing\..*/search.*q=.*)@\1\&adlt=strict@i', sg_config) + + # URL lists + self.assertIn(r'dest local-ok-default {', sg_config) + self.assertIn(f'domainlist local-ok-default/domains', sg_config) + self.assertIn(r'dest local-ok-url-default {', sg_config) + self.assertIn(f'urllist local-ok-url-default/urls', sg_config) + + # Redirect - default value + self.assertIn(f'redirect 302:http://block.vyos.net', sg_config) + + # local-block database + tmp = cmd(f'sudo cat {sg_db_dir}/local-block-default/domains') + for block in local_block: + self.assertIn(f'{block}', tmp) + + tmp = cmd(f'sudo cat {sg_db_dir}/local-block-url-default/urls') + for url in local_block_url: + self.assertIn(f'{url}', tmp) + + tmp = cmd(f'sudo cat {sg_db_dir}/local-block-keyword-default/expressions') + for pattern in local_block_pattern: + self.assertIn(f'{pattern}', tmp) + + # local-ok database + tmp = cmd(f'sudo cat {sg_db_dir}/local-ok-default/domains') + for ok in local_ok: + self.assertIn(f'{ok}', tmp) + + tmp = cmd(f'sudo cat {sg_db_dir}/local-ok-url-default/urls') + for url in local_ok_url: + self.assertIn(f'{url}', tmp) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |
