diff options
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 12 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_firewall.py | 172 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_policy_route.py | 106 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_protocols_nhrp.py | 9 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_monitoring_telegraf.py | 65 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_snmp.py | 102 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_conntrack.py | 24 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_flow-accounting.py | 15 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vpn_ipsec.py | 32 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_zone_policy.py | 63 |
10 files changed, 545 insertions, 55 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index bc0a6c128..9de961249 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -607,11 +607,11 @@ class BasicInterfaceTest: self.cli_commit() for interface in self._interfaces: - base_options = f'-A FORWARD -o {interface} -p tcp -m tcp --tcp-flags SYN,RST SYN' - out = cmd('sudo iptables-save -t mangle') + base_options = f'oifname "{interface}"' + out = cmd('sudo nft list chain raw VYOS_TCP_MSS') for line in out.splitlines(): if line.startswith(base_options): - self.assertIn(f'--set-mss {mss}', line) + self.assertIn(f'tcp option maxseg size set {mss}', line) tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds @@ -662,11 +662,11 @@ class BasicInterfaceTest: self.cli_commit() for interface in self._interfaces: - base_options = f'-A FORWARD -o {interface} -p tcp -m tcp --tcp-flags SYN,RST SYN' - out = cmd('sudo ip6tables-save -t mangle') + base_options = f'oifname "{interface}"' + out = cmd('sudo nft list chain ip6 raw VYOS_TCP_MSS') for line in out.splitlines(): if line.startswith(base_options): - self.assertIn(f'--set-mss {mss}', line) + self.assertIn(f'tcp option maxseg size set {mss}', line) proc_base = f'/proc/sys/net/ipv6/conf/{interface}' diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py new file mode 100755 index 000000000..5f728f0cd --- /dev/null +++ b/smoketest/scripts/cli/test_firewall.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from glob import glob + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.util import cmd + +sysfs_config = { + 'all_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_all', 'default': '0', 'test_value': 'disable'}, + 'broadcast_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_broadcasts', 'default': '1', 'test_value': 'enable'}, + 'ip_src_route': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_source_route', 'default': '0', 'test_value': 'enable'}, + 'ipv6_receive_redirects': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, + 'ipv6_src_route': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_source_route', 'default': '-1', 'test_value': 'enable'}, + 'log_martians': {'sysfs': '/proc/sys/net/ipv4/conf/all/log_martians', 'default': '1', 'test_value': 'disable'}, + 'receive_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, + 'send_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/send_redirects', 'default': '1', 'test_value': 'disable'}, + 'syn_cookies': {'sysfs': '/proc/sys/net/ipv4/tcp_syncookies', 'default': '1', 'test_value': 'disable'}, + 'twa_hazards_protection': {'sysfs': '/proc/sys/net/ipv4/tcp_rfc1337', 'default': '0', 'test_value': 'enable'} +} + +class TestFirewall(VyOSUnitTestSHIM.TestCase): + def setUp(self): + self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) + + def tearDown(self): + self.cli_delete(['interfaces', 'ethernet', 'eth0']) + self.cli_commit() + self.cli_delete(['firewall']) + self.cli_commit() + + def test_groups(self): + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) + self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) + self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp']) + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) + + self.cli_commit() + + nftables_search = [ + ['iifname "eth0"', 'jump smoketest'], + ['ip saddr { 172.16.99.0/24 }', 'ip daddr 172.16.10.10', 'tcp dport { 53, 123 }', 'return'], + ] + + nftables_output = cmd('sudo nft list table ip filter') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + def test_basic_rules(self): + self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'protocol', 'tcp_udp']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'destination', 'port', '8888']) + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) + + self.cli_commit() + + nftables_search = [ + ['iifname "eth0"', 'jump smoketest'], + ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'return'], + ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'reject'], + ['smoketest default-action', 'drop'] + ] + + nftables_output = cmd('sudo nft list table ip filter') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + def test_basic_rules_ipv6(self): + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'default-action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'source', 'address', '2002::1']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'destination', 'address', '2002::1:1']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'protocol', 'tcp_udp']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'destination', 'port', '8888']) + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'ipv6-name', 'v6-smoketest']) + + self.cli_commit() + + nftables_search = [ + ['iifname "eth0"', 'jump v6-smoketest'], + ['saddr 2002::1', 'daddr 2002::1:1', 'return'], + ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'reject'], + ['smoketest default-action', 'drop'] + ] + + nftables_output = cmd('sudo nft list table ip6 filter') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + def test_state_policy(self): + self.cli_set(['firewall', 'state-policy', 'established', 'action', 'accept']) + self.cli_set(['firewall', 'state-policy', 'related', 'action', 'accept']) + self.cli_set(['firewall', 'state-policy', 'invalid', 'action', 'drop']) + + self.cli_commit() + + chains = { + 'ip filter': ['VYOS_FW_IN', 'VYOS_FW_OUTPUT', 'VYOS_FW_LOCAL'], + 'ip6 filter': ['VYOS_FW6_IN', 'VYOS_FW6_OUTPUT', 'VYOS_FW6_LOCAL'] + } + + for table in ['ip filter', 'ip6 filter']: + for chain in chains[table]: + nftables_output = cmd(f'sudo nft list chain {table} {chain}') + self.assertTrue('jump VYOS_STATE_POLICY' in nftables_output) + + def test_sysfs(self): + for name, conf in sysfs_config.items(): + paths = glob(conf['sysfs']) + for path in paths: + with open(path, 'r') as f: + self.assertEqual(f.read().strip(), conf['default'], msg=path) + + self.cli_set(['firewall', name.replace("_", "-"), conf['test_value']]) + + self.cli_commit() + + for name, conf in sysfs_config.items(): + paths = glob(conf['sysfs']) + for path in paths: + with open(path, 'r') as f: + self.assertNotEqual(f.read().strip(), conf['default'], msg=path) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py new file mode 100755 index 000000000..70a234187 --- /dev/null +++ b/smoketest/scripts/cli/test_policy_route.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.util import cmd + +mark = '100' +table_mark_offset = 0x7fffffff +table_id = '101' + +class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): + def setUp(self): + self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) + self.cli_set(['protocols', 'static', 'table', '101', 'route', '0.0.0.0/0', 'interface', 'eth0']) + + def tearDown(self): + self.cli_delete(['interfaces', 'ethernet', 'eth0']) + self.cli_delete(['policy', 'route']) + self.cli_delete(['policy', 'ipv6-route']) + self.cli_commit() + + def test_pbr_mark(self): + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'policy', 'route', 'smoketest']) + + self.cli_commit() + + mark_hex = "{0:#010x}".format(int(mark)) + + nftables_search = [ + ['iifname "eth0"', 'jump VYOS_PBR_smoketest'], + ['ip daddr 172.16.10.10', 'ip saddr 172.16.20.10', 'meta mark set ' + mark_hex], + ] + + nftables_output = cmd('sudo nft list table ip mangle') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + def test_pbr_table(self): + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'port', '8888']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'table', table_id]) + + self.cli_set(['interfaces', 'ethernet', 'eth0', 'policy', 'route', 'smoketest']) + + self.cli_commit() + + mark_hex = "{0:#010x}".format(table_mark_offset - int(table_id)) + + nftables_search = [ + ['iifname "eth0"', 'jump VYOS_PBR_smoketest'], + ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'meta mark set ' + mark_hex] + ] + + nftables_output = cmd('sudo nft list table ip mangle') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + ip_rule_search = [ + ['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id] + ] + + ip_rule_output = cmd('ip rule show') + + for search in ip_rule_search: + matched = False + for line in ip_rule_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_nhrp.py b/smoketest/scripts/cli/test_protocols_nhrp.py index aa0ac268d..40b19fec7 100755 --- a/smoketest/scripts/cli/test_protocols_nhrp.py +++ b/smoketest/scripts/cli/test_protocols_nhrp.py @@ -18,6 +18,7 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.firewall import find_nftables_rule from vyos.util import call, process_named_running, read_file tunnel_path = ['interfaces', 'tunnel'] @@ -91,6 +92,14 @@ class TestProtocolsNHRP(VyOSUnitTestSHIM.TestCase): for line in opennhrp_lines: self.assertIn(line, tmp_opennhrp_conf) + firewall_matches = [ + 'ip protocol gre', + 'ip saddr 192.0.2.1', + 'ip daddr 224.0.0.0/4', + 'comment "VYOS_NHRP_tun100"' + ] + + self.assertTrue(find_nftables_rule('ip filter', 'VYOS_FW_OUTPUT', firewall_matches) is not None) self.assertTrue(process_named_running('opennhrp')) if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_service_monitoring_telegraf.py b/smoketest/scripts/cli/test_service_monitoring_telegraf.py new file mode 100755 index 000000000..b857926e2 --- /dev/null +++ b/smoketest/scripts/cli/test_service_monitoring_telegraf.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos.util import process_named_running +from vyos.util import read_file + +PROCESS_NAME = 'telegraf' +TELEGRAF_CONF = '/run/telegraf/vyos-telegraf.conf' +base_path = ['service', 'monitoring', 'telegraf'] +org = 'log@in.local' +token = 'GuRJc12tIzfjnYdKRAIYbxdWd2aTpOT9PVYNddzDnFV4HkAcD7u7-kndTFXjGuXzJN6TTxmrvPODB4mnFcseDV==' +port = '8888' +url = 'https://foo.local' +bucket = 'main' +inputs = ['cpu', 'disk', 'mem', 'net', 'system', 'kernel', 'interrupts', 'syslog'] + +class TestMonitoringTelegraf(VyOSUnitTestSHIM.TestCase): + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + + def test_01_basic_config(self): + self.cli_set(base_path + ['authentication', 'organization', org]) + self.cli_set(base_path + ['authentication', 'token', token]) + self.cli_set(base_path + ['port', port]) + self.cli_set(base_path + ['url', url]) + + # commit changes + self.cli_commit() + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + config = read_file(TELEGRAF_CONF) + + # Check telegraf config + self.assertIn(f'organization = "{org}"', config) + self.assertIn(token, config) + self.assertIn(f'urls = ["{url}:{port}"]', config) + self.assertIn(f'bucket = "{bucket}"', config) + + for input in inputs: + self.assertIn(input, config) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index 058835c72..fc24fd54e 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -22,14 +22,25 @@ from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.template import is_ipv4 from vyos.template import address_from_cidr +from vyos.util import call +from vyos.util import DEVNULL from vyos.util import read_file from vyos.util import process_named_running +from vyos.version import get_version_data PROCESS_NAME = 'snmpd' SNMPD_CONF = '/etc/snmp/snmpd.conf' base_path = ['service', 'snmp'] +snmpv3_group = 'default_group' +snmpv3_view = 'default_view' +snmpv3_view_oid = '1' +snmpv3_user = 'vyos' +snmpv3_auth_pw = 'vyos12345678' +snmpv3_priv_pw = 'vyos87654321' +snmpv3_engine_id = '000000000000000000000002' + def get_config_value(key): tmp = read_file(SNMPD_CONF) tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) @@ -45,13 +56,22 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): cls.cli_delete(cls, base_path) def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + # delete testing SNMP config self.cli_delete(base_path) self.cli_commit() + # Check for running process + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_snmp_basic(self): dummy_if = 'dum7312' dummy_addr = '100.64.0.1/32' + contact = 'maintainers@vyos.io' + location = 'QEMU' + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', dummy_addr]) # Check if SNMP can be configured and service runs @@ -71,8 +91,8 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): for addr in listen: self.cli_set(base_path + ['listen-address', addr, 'port', port]) - self.cli_set(base_path + ['contact', 'maintainers@vyos.io']) - self.cli_set(base_path + ['location', 'qemu']) + self.cli_set(base_path + ['contact', contact]) + self.cli_set(base_path + ['location', location]) self.cli_commit() @@ -82,7 +102,6 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): config = get_config_value('agentaddress') expected = 'unix:/run/snmpd.socket' self.assertIn(expected, config) - for addr in listen: if is_ipv4(addr): expected = f'udp:{addr}:{port}' @@ -90,6 +109,16 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): expected = f'udp6:[{addr}]:{port}' self.assertIn(expected, config) + config = get_config_value('sysDescr') + version_data = get_version_data() + self.assertEqual('VyOS ' + version_data['version'], config) + + config = get_config_value('SysContact') + self.assertEqual(contact, config) + + config = get_config_value('SysLocation') + self.assertEqual(location, config) + # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) self.cli_delete(['interfaces', 'dummy', dummy_if]) @@ -98,8 +127,7 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): def test_snmpv3_sha(self): # Check if SNMPv3 can be configured with SHA authentication # and service runs - - self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.cli_set(base_path + ['v3', 'engineid', snmpv3_engine_id]) self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) # check validate() - a view must be created before this can be committed with self.assertRaises(ConfigSessionError): @@ -109,46 +137,52 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default']) # create user - self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'plaintext-password', snmpv3_auth_pw]) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'type', 'sha']) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'plaintext-password', snmpv3_priv_pw]) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'type', 'aes']) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'group', 'default']) self.cli_commit() # commit will alter the CLI values - check if they have been updated: hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' - tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + tmp = self._session.show_config(base_path + ['v3', 'user', snmpv3_user, 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) - tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + hashed_password = '54705c8de9e81fdf61ad7ac044fa8fe611ddff6b' + tmp = self._session.show_config(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) # TODO: read in config file and check values - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # Try SNMPv3 connection + tmp = call(f'snmpwalk -v 3 -u {snmpv3_user} -a SHA -A {snmpv3_auth_pw} -x AES -X {snmpv3_priv_pw} -l authPriv 127.0.0.1', stdout=DEVNULL) + self.assertEqual(tmp, 0) def test_snmpv3_md5(self): # Check if SNMPv3 can be configured with MD5 authentication # and service runs + self.cli_set(base_path + ['v3', 'engineid', snmpv3_engine_id]) - self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002']) - self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) - # check validate() - a view must be created before this can be comitted + # create user + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'plaintext-password', snmpv3_auth_pw]) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'type', 'md5']) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'plaintext-password', snmpv3_priv_pw]) + self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'type', 'des']) + + # check validate() - user requires a group to be created with self.assertRaises(ConfigSessionError): self.cli_commit() + self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', snmpv3_group]) - self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1']) - self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default']) + self.cli_set(base_path + ['v3', 'group', snmpv3_group, 'mode', 'ro']) + # check validate() - a view must be created before this can be comitted + with self.assertRaises(ConfigSessionError): + self.cli_commit() - # create user - self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) - self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + self.cli_set(base_path + ['v3', 'view', snmpv3_view, 'oid', snmpv3_view_oid]) + self.cli_set(base_path + ['v3', 'group', snmpv3_group, 'view', snmpv3_view]) self.cli_commit() @@ -157,13 +191,21 @@ class TestSNMPService(VyOSUnitTestSHIM.TestCase): tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) + hashed_password = 'e11c83f2c510540a3c4de84ee66de440' tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) - # TODO: read in config file and check values - - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + tmp = read_file(SNMPD_CONF) + # views + self.assertIn(f'view {snmpv3_view} included .{snmpv3_view_oid}', tmp) + # group + self.assertIn(f'group {snmpv3_group} usm {snmpv3_user}', tmp) + # access + self.assertIn(f'access {snmpv3_group} "" usm auth exact {snmpv3_view} none none', tmp) + + # Try SNMPv3 connection + tmp = call(f'snmpwalk -v 3 -u {snmpv3_user} -a MD5 -A {snmpv3_auth_pw} -x DES -X {snmpv3_priv_pw} -l authPriv 127.0.0.1', stdout=DEVNULL) + self.assertEqual(tmp, 0) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index b2934cf04..95c2a6c55 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -15,10 +15,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os +import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.firewall import find_nftables_rule from vyos.util import cmd from vyos.util import read_file @@ -156,8 +158,8 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): 'driver' : ['nf_nat_h323', 'nf_conntrack_h323'], }, 'nfs' : { - 'iptables' : ['-A VYATTA_CT_HELPER -p udp -m udp --dport 111 -j CT --helper rpc', - '-A VYATTA_CT_HELPER -p tcp -m tcp --dport 111 -j CT --helper rpc'], + 'nftables' : ['ct helper set "rpc_tcp"', + 'ct helper set "rpc_udp"'] }, 'pptp' : { 'driver' : ['nf_nat_pptp', 'nf_conntrack_pptp'], @@ -166,9 +168,7 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): 'driver' : ['nf_nat_sip', 'nf_conntrack_sip'], }, 'sqlnet' : { - 'iptables' : ['-A VYATTA_CT_HELPER -p tcp -m tcp --dport 1536 -j CT --helper tns', - '-A VYATTA_CT_HELPER -p tcp -m tcp --dport 1525 -j CT --helper tns', - '-A VYATTA_CT_HELPER -p tcp -m tcp --dport 1521 -j CT --helper tns'], + 'nftables' : ['ct helper set "tns_tcp"'] }, 'tftp' : { 'driver' : ['nf_nat_tftp', 'nf_conntrack_tftp'], @@ -187,10 +187,9 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): if 'driver' in module_options: for driver in module_options['driver']: self.assertTrue(os.path.isdir(f'/sys/module/{driver}')) - if 'iptables' in module_options: - rules = cmd('sudo iptables-save -t raw') - for ruleset in module_options['iptables']: - self.assertIn(ruleset, rules) + if 'nftables' in module_options: + for rule in module_options['nftables']: + self.assertTrue(find_nftables_rule('raw', 'VYOS_CT_HELPER', [rule]) != None) # unload modules for module in modules: @@ -204,10 +203,9 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): if 'driver' in module_options: for driver in module_options['driver']: self.assertFalse(os.path.isdir(f'/sys/module/{driver}')) - if 'iptables' in module_options: - rules = cmd('sudo iptables-save -t raw') - for ruleset in module_options['iptables']: - self.assertNotIn(ruleset, rules) + if 'nftables' in module_options: + for rule in module_options['nftables']: + self.assertTrue(find_nftables_rule('raw', 'VYOS_CT_HELPER', [rule]) == None) def test_conntrack_hash_size(self): hash_size = '65536' diff --git a/smoketest/scripts/cli/test_system_flow-accounting.py b/smoketest/scripts/cli/test_system_flow-accounting.py index a53999461..857df1be6 100755 --- a/smoketest/scripts/cli/test_system_flow-accounting.py +++ b/smoketest/scripts/cli/test_system_flow-accounting.py @@ -62,9 +62,20 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): self.cli_commit() # verify configuration - tmp = cmd('sudo iptables-save -t raw') + nftables_output = cmd('sudo nft list chain raw VYOS_CT_PREROUTING_HOOK').splitlines() for interface in Section.interfaces('ethernet'): - self.assertIn(f'-A VYATTA_CT_PREROUTING_HOOK -i {interface} -m comment --comment FLOW_ACCOUNTING_RULE -j NFLOG --nflog-group 2 --nflog-size 128 --nflog-threshold 100', tmp) + rule_found = False + ifname_search = f'iifname "{interface}"' + + for nftables_line in nftables_output: + if 'FLOW_ACCOUNTING_RULE' in nftables_line and ifname_search in nftables_line: + self.assertIn('group 2', nftables_line) + self.assertIn('snaplen 128', nftables_line) + self.assertIn('queue-threshold 100', nftables_line) + rule_found = True + break + + self.assertTrue(rule_found) uacctd = read_file(uacctd_conf) # circular queue size - buffer_size diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py index c710aec6e..1433c7329 100755 --- a/smoketest/scripts/cli/test_vpn_ipsec.py +++ b/smoketest/scripts/cli/test_vpn_ipsec.py @@ -111,9 +111,22 @@ rgiyCHemtMepq57Pl1Nmj49eEA== """ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): - def setUp(self): - self.cli_set(base_path + ['interface', f'{interface}.{vif}']) + @classmethod + def setUpClass(cls): + super(cls, cls).setUpClass() + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + cls.cli_set(cls, base_path + ['interface', f'{interface}.{vif}']) + + @classmethod + def tearDownClass(cls): + super(cls, cls).tearDownClass() + + cls.cli_delete(cls, base_path + ['interface', f'{interface}.{vif}']) + def setUp(self): # Set IKE/ESP Groups self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes128']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha1']) @@ -127,7 +140,6 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) self.cli_delete(tunnel_path) - self.cli_delete(ethernet_path) self.cli_commit() # Check for no longer running process @@ -158,6 +170,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): # Site to site local_address = '192.0.2.10' + priority = '20' peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) @@ -173,6 +186,10 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.11.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'port', '443']) + self.cli_set(peer_base_path + ['tunnel', '2', 'local', 'prefix', '10.1.0.0/16']) + self.cli_set(peer_base_path + ['tunnel', '2', 'remote', 'prefix', '10.2.0.0/16']) + self.cli_set(peer_base_path + ['tunnel', '2', 'priority', priority]) + self.cli_commit() # Verify strongSwan configuration @@ -187,8 +204,15 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', f'mode = tunnel', + f'peer_{peer_ip.replace(".","-")}_tunnel_1', f'local_ts = 172.16.10.0/24[tcp/443],172.16.11.0/24[tcp/443]', - f'remote_ts = 172.17.10.0/24[tcp/443],172.17.11.0/24[tcp/443]' + f'remote_ts = 172.17.10.0/24[tcp/443],172.17.11.0/24[tcp/443]', + f'mode = tunnel', + f'peer_{peer_ip.replace(".","-")}_tunnel_2', + f'local_ts = 10.1.0.0/16', + f'remote_ts = 10.2.0.0/16', + f'priority = {priority}', + f'mode = tunnel', ] for line in swanctl_conf_lines: self.assertIn(line, swanctl_conf) diff --git a/smoketest/scripts/cli/test_zone_policy.py b/smoketest/scripts/cli/test_zone_policy.py new file mode 100755 index 000000000..c0af6164b --- /dev/null +++ b/smoketest/scripts/cli/test_zone_policy.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.util import cmd + +class TestZonePolicy(VyOSUnitTestSHIM.TestCase): + def setUp(self): + self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) + + def tearDown(self): + self.cli_delete(['zone-policy']) + self.cli_delete(['firewall']) + self.cli_commit() + + def test_basic_zone(self): + self.cli_set(['zone-policy', 'zone', 'smoketest-eth0', 'interface', 'eth0']) + self.cli_set(['zone-policy', 'zone', 'smoketest-eth0', 'from', 'smoketest-local', 'firewall', 'name', 'smoketest']) + self.cli_set(['zone-policy', 'zone', 'smoketest-local', 'local-zone']) + self.cli_set(['zone-policy', 'zone', 'smoketest-local', 'from', 'smoketest-eth0', 'firewall', 'name', 'smoketest']) + + self.cli_commit() + + nftables_search = [ + ['chain VZONE_smoketest-eth0'], + ['chain VZONE_smoketest-local_IN'], + ['chain VZONE_smoketest-local_OUT'], + ['oifname { "eth0" }', 'jump VZONE_smoketest-eth0'], + ['jump VZONE_smoketest-local_IN'], + ['jump VZONE_smoketest-local_OUT'], + ['iifname { "eth0" }', 'jump smoketest'], + ['oifname { "eth0" }', 'jump smoketest'] + ] + + nftables_output = cmd('sudo nft list table ip filter') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |