diff options
Diffstat (limited to 'smoketest/scripts/cli')
41 files changed, 2442 insertions, 661 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index b2acb03cc..471bdaffb 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -27,6 +27,17 @@ from vyos.util import process_named_running class BasicAccelPPPTest: class TestCase(VyOSUnitTestSHIM.TestCase): + + @classmethod + def setUpClass(cls): + cls._process_name = 'accel-pppd' + + super(BasicAccelPPPTest.TestCase, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, cls._base_path) + def setUp(self): self._gateway = '192.0.2.1' # ensure we can also run this test on a live system - so lets clean @@ -34,9 +45,15 @@ class BasicAccelPPPTest: self.cli_delete(self._base_path) def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(self._process_name)) + self.cli_delete(self._base_path) self.cli_commit() + # Check for running process + self.assertFalse(process_named_running(self._process_name)) + def set(self, path): self.cli_set(self._base_path + path) @@ -113,9 +130,6 @@ class BasicAccelPPPTest: tmp = re.findall(regex, tmp) self.assertTrue(tmp) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) - # Check local-users default value(s) self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) # commit changes @@ -127,9 +141,6 @@ class BasicAccelPPPTest: tmp = re.findall(regex, tmp) self.assertTrue(tmp) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) - def test_accel_radius_authentication(self): # Test configuration of RADIUS authentication for PPPoE server self.basic_config() @@ -186,9 +197,6 @@ class BasicAccelPPPTest: self.assertEqual(f'req-limit=0', server[4]) self.assertEqual(f'fail-time=0', server[5]) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) - # # Disable Radius Accounting # diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 816ba6dcd..55343b893 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -232,6 +232,9 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] + # just set the interface base without any option - some interfaces + # (VTI) do not require any option to be brought up + self.cli_set(base) for option in self._options.get(interface, []): self.cli_set(base + option.split()) @@ -635,6 +638,7 @@ class BasicInterfaceTest: self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo]) self.cli_set(path + ['ip', 'disable-arp-filter']) self.cli_set(path + ['ip', 'disable-forwarding']) + self.cli_set(path + ['ip', 'enable-directed-broadcast']) self.cli_set(path + ['ip', 'enable-arp-accept']) self.cli_set(path + ['ip', 'enable-arp-announce']) self.cli_set(path + ['ip', 'enable-arp-ignore']) @@ -671,6 +675,9 @@ class BasicInterfaceTest: tmp = read_file(f'{proc_base}/forwarding') self.assertEqual('0', tmp) + tmp = read_file(f'{proc_base}/bc_forwarding') + self.assertEqual('1', tmp) + tmp = read_file(f'{proc_base}/proxy_arp') self.assertEqual('1', tmp) diff --git a/smoketest/scripts/cli/test_component_version.py b/smoketest/scripts/cli/test_component_version.py index 777379bdd..7b1b12c53 100755 --- a/smoketest/scripts/cli/test_component_version.py +++ b/smoketest/scripts/cli/test_component_version.py @@ -16,7 +16,7 @@ import unittest -from vyos.systemversions import get_system_versions, get_system_component_version +import vyos.component_version as component_version # After T3474, component versions should be updated in the files in # vyos-1x/interface-definitions/include/version/ @@ -24,13 +24,27 @@ from vyos.systemversions import get_system_versions, get_system_component_versio # that in the xml cache. class TestComponentVersion(unittest.TestCase): def setUp(self): - self.legacy_d = get_system_versions() - self.xml_d = get_system_component_version() + self.legacy_d = component_version.legacy_from_system() + self.xml_d = component_version.from_system() + self.set_legacy_d = set(self.legacy_d) + self.set_xml_d = set(self.xml_d) def test_component_version(self): - self.assertTrue(set(self.legacy_d).issubset(set(self.xml_d))) + bool_issubset = (self.set_legacy_d.issubset(self.set_xml_d)) + if not bool_issubset: + missing = self.set_legacy_d.difference(self.set_xml_d) + print(f'\n\ncomponents in legacy but not in XML: {missing}') + print('new components must be listed in xml-component-version.xml.in') + self.assertTrue(bool_issubset) + + bad_component_version = False for k, v in self.legacy_d.items(): - self.assertTrue(v <= self.xml_d[k]) + bool_inequality = (v <= self.xml_d[k]) + if not bool_inequality: + print(f'\n\n{k} has not been updated in XML component versions:') + print(f'legacy version {v}; XML version {self.xml_d[k]}') + bad_component_version = True + self.assertFalse(bad_component_version) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_container.py b/smoketest/scripts/cli/test_container.py index cc0cdaec0..902156ee6 100644..100755 --- a/smoketest/scripts/cli/test_container.py +++ b/smoketest/scripts/cli/test_container.py @@ -15,6 +15,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest +import glob import json from base_vyostest_shim import VyOSUnitTestSHIM @@ -25,10 +26,13 @@ from vyos.util import process_named_running from vyos.util import read_file base_path = ['container'] -cont_image = 'busybox' +cont_image = 'busybox:stable' # busybox is included in vyos-build prefix = '192.168.205.0/24' net_name = 'NET01' -PROCESS_NAME = 'podman' +PROCESS_NAME = 'conmon' +PROCESS_PIDFILE = '/run/vyos-container-{0}.service.pid' + +busybox_image_path = '/usr/share/vyos/busybox-stable.tar' def cmd_to_json(command): c = cmd(command + ' --format=json') @@ -37,7 +41,34 @@ def cmd_to_json(command): return data -class TesContainer(VyOSUnitTestSHIM.TestCase): +class TestContainer(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestContainer, cls).setUpClass() + + # Load image for smoketest provided in vyos-build + try: + cmd(f'cat {busybox_image_path} | sudo podman load') + except: + cls.skipTest(cls, reason='busybox image not available') + + @classmethod + def tearDownClass(cls): + super(TestContainer, cls).tearDownClass() + + # Cleanup podman image + cmd(f'sudo podman image rm -f {cont_image}') + + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + + # Ensure no container process remains + self.assertIsNone(process_named_running(PROCESS_NAME)) + + # Ensure systemd units are removed + units = glob.glob('/run/systemd/system/vyos-container-*') + self.assertEqual(units, []) def test_01_basic_container(self): cont_name = 'c1' @@ -53,13 +84,17 @@ class TesContainer(VyOSUnitTestSHIM.TestCase): # commit changes self.cli_commit() + pid = 0 + with open(PROCESS_PIDFILE.format(cont_name), 'r') as f: + pid = int(f.read()) + # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertEqual(process_named_running(PROCESS_NAME), pid) def test_02_container_network(self): cont_name = 'c2' cont_ip = '192.168.205.25' - self.cli_set(base_path + ['network', net_name, 'ipv4-prefix', prefix]) + self.cli_set(base_path + ['network', net_name, 'prefix', prefix]) self.cli_set(base_path + ['name', cont_name, 'image', cont_image]) self.cli_set(base_path + ['name', cont_name, 'network', net_name, 'address', cont_ip]) @@ -67,7 +102,7 @@ class TesContainer(VyOSUnitTestSHIM.TestCase): self.cli_commit() n = cmd_to_json(f'sudo podman network inspect {net_name}') - json_subnet = n['plugins'][0]['ipam']['ranges'][0][0]['subnet'] + json_subnet = n['subnets'][0]['subnet'] c = cmd_to_json(f'sudo podman container inspect {cont_name}') json_ip = c['NetworkSettings']['Networks'][net_name]['IPAddress'] diff --git a/smoketest/scripts/cli/test_dependency_graph.py b/smoketest/scripts/cli/test_dependency_graph.py new file mode 100755 index 000000000..45a40acc4 --- /dev/null +++ b/smoketest/scripts/cli/test_dependency_graph.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import unittest +from graphlib import TopologicalSorter, CycleError + +DEP_FILE = '/usr/share/vyos/config-mode-dependencies.json' + +def graph_from_dict(d): + g = {} + for k in list(d): + g[k] = set() + # add the dependencies for every sub-case; should there be cases + # that are mutally exclusive in the future, the graphs will be + # distinguished + for el in list(d[k]): + g[k] |= set(d[k][el]) + return g + +class TestDependencyGraph(unittest.TestCase): + def setUp(self): + with open(DEP_FILE) as f: + dd = json.load(f) + self.dependency_graph = graph_from_dict(dd) + + def test_cycles(self): + ts = TopologicalSorter(self.dependency_graph) + out = None + try: + # get node iterator + order = ts.static_order() + # try iteration + _ = [*order] + except CycleError as e: + out = e.args + + self.assertIsNone(out) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index b8f944575..09b520b72 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -17,10 +17,13 @@ import unittest from glob import glob +from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.configsession import ConfigSessionError from vyos.util import cmd +from vyos.util import run sysfs_config = { 'all_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_all', 'default': '0', 'test_value': 'disable'}, @@ -44,23 +47,84 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): # out the current configuration :) cls.cli_delete(cls, ['firewall']) - cls.cli_set(cls, ['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) - @classmethod def tearDownClass(cls): - cls.cli_delete(cls, ['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) super(TestFirewall, cls).tearDownClass() def tearDown(self): - self.cli_delete(['interfaces', 'ethernet', 'eth0', 'firewall']) self.cli_delete(['firewall']) self.cli_commit() + # Verify chains/sets are cleaned up from nftables + nftables_search = [ + ['set M_smoketest_mac'], + ['set N_smoketest_network'], + ['set P_smoketest_port'], + ['set D_smoketest_domain'], + ['set RECENT_smoketest_4'], + ['chain NAME_smoketest'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_filter', inverse=True) + + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): + # Resolver no longer blocks commit, need to wait for daemon to populate set + count = 0 + while count < max_wait: + code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') + if code == 0: + return True + count += 1 + sleep(1) + return False + + def test_geoip(self): + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'drop']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'se']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'gb']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'de']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'fr']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'inverse-match']) + + self.cli_commit() + + nftables_search = [ + ['ip saddr @GEOIP_CC_smoketest_1', 'drop'], + ['ip saddr != @GEOIP_CC_smoketest_2', 'return'] + ] + + # -t prevents 1000+ GeoIP elements being returned + self.verify_nftables(nftables_search, 'ip vyos_filter', args='-t') + def test_groups(self): + hostmap_path = ['system', 'static-host-mapping', 'host-name'] + example_org = ['192.0.2.8', '192.0.2.10', '192.0.2.11'] + + self.cli_set(hostmap_path + ['example.com', 'inet', '192.0.2.5']) + for ips in example_org: + self.cli_set(hostmap_path + ['example.org', 'inet', ips]) + + self.cli_commit() + self.cli_set(['firewall', 'group', 'mac-group', 'smoketest_mac', 'mac-address', '00:01:02:03:04:05']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123']) + self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.com']) + self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.org']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) @@ -68,93 +132,309 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'group', 'mac-group', 'smoketest_mac']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'source', 'group', 'domain-group', 'smoketest_domain']) - self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) + self.cli_set(['firewall', 'interface', 'eth0', 'in', 'name', 'smoketest']) self.cli_commit() + self.wait_for_domain_resolver('ip vyos_filter', 'D_smoketest_domain', '192.0.2.5') + nftables_search = [ ['iifname "eth0"', 'jump NAME_smoketest'], - ['ip saddr { 172.16.99.0/24 }', 'ip daddr 172.16.10.10', 'th dport { 53, 123 }', 'return'], - ['ether saddr { 00:01:02:03:04:05 }', 'return'] + ['ip saddr @N_smoketest_network', 'ip daddr 172.16.10.10', 'th dport @P_smoketest_port', 'return'], + ['elements = { 172.16.99.0/24 }'], + ['elements = { 53, 123 }'], + ['ether saddr @M_smoketest_mac', 'return'], + ['elements = { 00:01:02:03:04:05 }'], + ['set D_smoketest_domain'], + ['elements = { 192.0.2.5, 192.0.2.8,'], + ['192.0.2.10, 192.0.2.11 }'], + ['ip saddr @D_smoketest_domain', 'return'] ] + self.verify_nftables(nftables_search, 'ip vyos_filter') - nftables_output = cmd('sudo nft list table ip filter') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched, msg=search) + self.cli_delete(['system', 'static-host-mapping']) + self.cli_commit() - def test_basic_rules(self): - self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) + def test_nested_groups(self): + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) + self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) + self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'port', '123']) + self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'include', 'smoketest_port']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'protocol', 'tcp']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'destination', 'port', '8888']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'syn']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'action', 'accept']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'protocol', 'tcp']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'destination', 'port', '22']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'limit', 'rate', '5/minute']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network1']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port1']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) - self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) + self.cli_set(['firewall', 'interface', 'eth0', 'in', 'name', 'smoketest']) self.cli_commit() + # Test circular includes + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) + nftables_search = [ ['iifname "eth0"', 'jump NAME_smoketest'], - ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'return'], - ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'reject'], - ['tcp dport { 22 }', 'limit rate 5/minute', 'return'], - ['smoketest default-action', 'drop'] + ['ip saddr @N_smoketest_network1', 'th dport @P_smoketest_port1', 'return'], + ['elements = { 172.16.99.0/24, 172.16.101.0/24 }'], + ['elements = { 53, 123 }'] ] - nftables_output = cmd('sudo nft list table ip filter') + self.verify_nftables(nftables_search, 'ip vyos_filter') + + def test_ipv4_basic_rules(self): + name = 'smoketest' + interface = 'eth0' + mss_range = '501-1460' + + self.cli_set(['firewall', 'name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'name', name, 'enable-default-log']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'source', 'address', '172.16.20.10']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'destination', 'address', '172.16.10.10']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'log', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'log-level', 'debug']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'ttl', 'eq', '15']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'protocol', 'tcp']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'destination', 'port', '8888']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'log', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'log-level', 'err']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'tcp', 'flags', 'syn']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'tcp', 'flags', 'not', 'ack']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'ttl', 'gt', '102']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'protocol', 'tcp']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'destination', 'port', '22']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'limit', 'rate', '5/minute']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'log', 'disable']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'action', 'drop']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'protocol', 'tcp']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'destination', 'port', '22']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'recent', 'count', '10']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'recent', 'time', 'minute']) + self.cli_set(['firewall', 'name', name, 'rule', '5', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '5', 'protocol', 'tcp']) + self.cli_set(['firewall', 'name', name, 'rule', '5', 'tcp', 'flags', 'syn']) + self.cli_set(['firewall', 'name', name, 'rule', '5', 'tcp', 'mss', mss_range]) + self.cli_set(['firewall', 'name', name, 'rule', '5', 'inbound-interface', interface]) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'action', 'return']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'protocol', 'gre']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'outbound-interface', interface]) + + self.cli_set(['firewall', 'interface', interface, 'in', 'name', name]) - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched, msg=search) + self.cli_commit() + + nftables_search = [ + [f'iifname "{interface}"', f'jump NAME_{name}'], + ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'log prefix "[smoketest-1-A]" level debug', 'ip ttl 15', 'return'], + ['tcp flags syn / syn,ack', 'tcp dport 8888', 'log prefix "[smoketest-2-R]" level err', 'ip ttl > 102', 'reject'], + ['tcp dport 22', 'limit rate 5/minute', 'return'], + ['log prefix "[smoketest-default-D]"','smoketest default-action', 'drop'], + ['tcp dport 22', 'add @RECENT_smoketest_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'drop'], + ['tcp flags & syn == syn', f'tcp option maxseg size {mss_range}', f'iifname "{interface}"'], + ['meta l4proto gre', f'oifname "{interface}"', 'return'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_filter') - def test_basic_rules_ipv6(self): - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'default-action', 'drop']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'action', 'accept']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'source', 'address', '2002::1']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'destination', 'address', '2002::1:1']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'action', 'reject']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'protocol', 'tcp_udp']) - self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'destination', 'port', '8888']) + def test_ipv4_advanced(self): + name = 'smoketest-adv' + name2 = 'smoketest-adv2' + interface = 'eth0' - self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'ipv6-name', 'v6-smoketest']) + self.cli_set(['firewall', 'name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'name', name, 'enable-default-log']) + + self.cli_set(['firewall', 'name', name, 'rule', '6', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'packet-length', '64']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'packet-length', '512']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'packet-length', '1024']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'dscp', '17']) + self.cli_set(['firewall', 'name', name, 'rule', '6', 'dscp', '52']) + + self.cli_set(['firewall', 'name', name, 'rule', '7', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '7', 'packet-length', '1-30000']) + self.cli_set(['firewall', 'name', name, 'rule', '7', 'packet-length-exclude', '60000-65535']) + self.cli_set(['firewall', 'name', name, 'rule', '7', 'dscp', '3-11']) + self.cli_set(['firewall', 'name', name, 'rule', '7', 'dscp-exclude', '21-25']) + + self.cli_set(['firewall', 'name', name2, 'default-action', 'jump']) + self.cli_set(['firewall', 'name', name2, 'default-jump-target', name]) + self.cli_set(['firewall', 'name', name2, 'enable-default-log']) + self.cli_set(['firewall', 'name', name2, 'rule', '1', 'source', 'address', '198.51.100.1']) + self.cli_set(['firewall', 'name', name2, 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'name', name2, 'rule', '1', 'jump-target', name]) + + self.cli_set(['firewall', 'interface', interface, 'in', 'name', name]) self.cli_commit() nftables_search = [ - ['iifname "eth0"', 'jump NAME6_v6-smoketest'], - ['saddr 2002::1', 'daddr 2002::1:1', 'return'], - ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'reject'], - ['smoketest default-action', 'drop'] + [f'iifname "{interface}"', f'jump NAME_{name}'], + ['ip length { 64, 512, 1024 }', 'ip dscp { 0x11, 0x34 }', 'return'], + ['ip length 1-30000', 'ip length != 60000-65535', 'ip dscp 0x03-0x0b', 'ip dscp != 0x15-0x19', 'return'], + [f'log prefix "[{name}-default-D]"', 'drop'], + ['ip saddr 198.51.100.1', f'jump NAME_{name}'], + [f'log prefix "[{name2}-default-J]"', f'jump NAME_{name}'] ] - nftables_output = cmd('sudo nft list table ip6 filter') + self.verify_nftables(nftables_search, 'ip vyos_filter') - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched, msg=search) + def test_ipv4_mask(self): + name = 'smoketest-mask' + interface = 'eth0' + + self.cli_set(['firewall', 'group', 'address-group', 'mask_group', 'address', '1.1.1.1']) + + self.cli_set(['firewall', 'name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'name', name, 'enable-default-log']) + + self.cli_set(['firewall', 'name', name, 'rule', '1', 'action', 'drop']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'destination', 'address', '0.0.1.2']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'destination', 'address-mask', '0.0.255.255']) + + self.cli_set(['firewall', 'name', name, 'rule', '2', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'source', 'address', '!0.0.3.4']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'source', 'address-mask', '0.0.255.255']) + + self.cli_set(['firewall', 'name', name, 'rule', '3', 'action', 'drop']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'source', 'address-mask', '0.0.255.255']) + + self.cli_set(['firewall', 'interface', interface, 'in', 'name', name]) + + self.cli_commit() + + nftables_search = [ + [f'daddr & 0.0.255.255 == 0.0.1.2'], + [f'saddr & 0.0.255.255 != 0.0.3.4'], + [f'saddr & 0.0.255.255 == @A_mask_group'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_filter') + + + def test_ipv6_basic_rules(self): + name = 'v6-smoketest' + interface = 'eth0' + + self.cli_set(['firewall', 'ipv6-name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', name, 'enable-default-log']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'source', 'address', '2002::1']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'destination', 'address', '2002::1:1']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'log', 'enable']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'log-level', 'crit']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'protocol', 'tcp_udp']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'destination', 'port', '8888']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'inbound-interface', interface]) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'action', 'return']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'protocol', 'gre']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'outbound-interface', interface]) + + self.cli_set(['firewall', 'interface', interface, 'in', 'ipv6-name', name]) + + self.cli_commit() + + nftables_search = [ + [f'iifname "{interface}"', f'jump NAME6_{name}'], + ['saddr 2002::1', 'daddr 2002::1:1', 'log prefix "[v6-smoketest-1-A]" level crit', 'return'], + ['meta l4proto { tcp, udp }', 'th dport 8888', f'iifname "{interface}"', 'reject'], + ['meta l4proto gre', f'oifname "{interface}"', 'return'], + ['smoketest default-action', f'log prefix "[{name}-default-D]"', 'drop'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_filter') + + def test_ipv6_advanced(self): + name = 'v6-smoketest-adv' + name2 = 'v6-smoketest-adv2' + interface = 'eth0' + + self.cli_set(['firewall', 'ipv6-name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', name, 'enable-default-log']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'packet-length', '65']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'packet-length', '513']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'packet-length', '1025']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'dscp', '18']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'dscp', '53']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'packet-length', '1-1999']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'packet-length-exclude', '60000-65535']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'dscp', '4-14']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'dscp-exclude', '31-35']) + + self.cli_set(['firewall', 'ipv6-name', name2, 'default-action', 'jump']) + self.cli_set(['firewall', 'ipv6-name', name2, 'default-jump-target', name]) + self.cli_set(['firewall', 'ipv6-name', name2, 'enable-default-log']) + self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'source', 'address', '2001:db8::/64']) + self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'action', 'jump']) + self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'jump-target', name]) + + self.cli_set(['firewall', 'interface', interface, 'in', 'ipv6-name', name]) + + self.cli_commit() + + nftables_search = [ + [f'iifname "{interface}"', f'jump NAME6_{name}'], + ['ip6 length { 65, 513, 1025 }', 'ip6 dscp { af21, 0x35 }', 'return'], + ['ip6 length 1-1999', 'ip6 length != 60000-65535', 'ip6 dscp 0x04-0x0e', 'ip6 dscp != 0x1f-0x23', 'return'], + [f'log prefix "[{name}-default-D]"', 'drop'], + ['ip6 saddr 2001:db8::/64', f'jump NAME6_{name}'], + [f'log prefix "[{name2}-default-J]"', f'jump NAME6_{name}'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_filter') + + def test_ipv6_mask(self): + name = 'v6-smoketest-mask' + interface = 'eth0' + + self.cli_set(['firewall', 'group', 'ipv6-address-group', 'mask_group', 'address', '::beef']) + + self.cli_set(['firewall', 'ipv6-name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', name, 'enable-default-log']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'destination', 'address', '::1111:2222:3333:4444']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '1', 'destination', 'address-mask', '::ffff:ffff:ffff:ffff']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'source', 'address', '!::aaaa:bbbb:cccc:dddd']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) + + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) + self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) + + self.cli_set(['firewall', 'interface', interface, 'in', 'ipv6-name', name]) + + self.cli_commit() + + nftables_search = [ + ['daddr & ::ffff:ffff:ffff:ffff == ::1111:2222:3333:4444'], + ['saddr & ::ffff:ffff:ffff:ffff != ::aaaa:bbbb:cccc:dddd'], + ['saddr & ::ffff:ffff:ffff:ffff == @A6_mask_group'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_state_policy(self): self.cli_set(['firewall', 'state-policy', 'established', 'action', 'accept']) @@ -164,53 +444,48 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.cli_commit() chains = { - 'ip filter': ['VYOS_FW_FORWARD', 'VYOS_FW_OUTPUT', 'VYOS_FW_LOCAL'], - 'ip6 filter': ['VYOS_FW6_FORWARD', 'VYOS_FW6_OUTPUT', 'VYOS_FW6_LOCAL'] + 'ip vyos_filter': ['VYOS_FW_FORWARD', 'VYOS_FW_OUTPUT', 'VYOS_FW_LOCAL'], + 'ip6 vyos_filter': ['VYOS_FW6_FORWARD', 'VYOS_FW6_OUTPUT', 'VYOS_FW6_LOCAL'] } - for table in ['ip filter', 'ip6 filter']: + for table in ['ip vyos_filter', 'ip6 vyos_filter']: for chain in chains[table]: nftables_output = cmd(f'sudo nft list chain {table} {chain}') self.assertTrue('jump VYOS_STATE_POLICY' in nftables_output) - def test_state_and_status_rules(self): - self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'state', 'established', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'state', 'related', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'state', 'invalid', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'action', 'accept']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'state', 'new', 'enable']) + def test_ipv4_state_and_status_rules(self): + name = 'smoketest-state' + interface = 'eth0' + + self.cli_set(['firewall', 'name', name, 'default-action', 'drop']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'state', 'established', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '1', 'state', 'related', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'action', 'reject']) + self.cli_set(['firewall', 'name', name, 'rule', '2', 'state', 'invalid', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'state', 'new', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'connection-status', 'nat', 'destination']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'action', 'accept']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'state', 'new', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'state', 'established', 'enable']) - self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'connection-status', 'nat', 'source']) + self.cli_set(['firewall', 'name', name, 'rule', '3', 'connection-status', 'nat', 'destination']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'action', 'accept']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'state', 'new', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'state', 'established', 'enable']) + self.cli_set(['firewall', 'name', name, 'rule', '4', 'connection-status', 'nat', 'source']) - self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) + self.cli_set(['firewall', 'interface', interface, 'in', 'name', name]) self.cli_commit() nftables_search = [ - ['iifname "eth0"', 'jump NAME_smoketest'], + [f'iifname "{interface}"', f'jump NAME_{name}'], ['ct state { established, related }', 'return'], - ['ct state { invalid }', 'reject'], - ['ct state { new }', 'ct status { dnat }', 'return'], - ['ct state { established, new }', 'ct status { snat }', 'return'], - ['smoketest default-action', 'drop'] + ['ct state invalid', 'reject'], + ['ct state new', 'ct status dnat', 'return'], + ['ct state { established, new }', 'ct status snat', 'return'], + ['drop', f'comment "{name} default-action drop"'] ] - nftables_output = cmd('sudo nft list table ip filter') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched, msg=search) + self.verify_nftables(nftables_search, 'ip vyos_filter') def test_sysfs(self): for name, conf in sysfs_config.items(): @@ -229,5 +504,35 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): with open(path, 'r') as f: self.assertNotEqual(f.read().strip(), conf['default'], msg=path) + def test_zone_basic(self): + self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) + self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'interface', 'eth0']) + self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'from', 'smoketest-local', 'firewall', 'name', 'smoketest']) + self.cli_set(['firewall', 'zone', 'smoketest-local', 'local-zone']) + self.cli_set(['firewall', 'zone', 'smoketest-local', 'from', 'smoketest-eth0', 'firewall', 'name', 'smoketest']) + + self.cli_commit() + + nftables_search = [ + ['chain VZONE_smoketest-eth0'], + ['chain VZONE_smoketest-local_IN'], + ['chain VZONE_smoketest-local_OUT'], + ['oifname "eth0"', 'jump VZONE_smoketest-eth0'], + ['jump VZONE_smoketest-local_IN'], + ['jump VZONE_smoketest-local_OUT'], + ['iifname "eth0"', 'jump NAME_smoketest'], + ['oifname "eth0"', 'jump NAME_smoketest'] + ] + + nftables_output = cmd('sudo nft list table ip vyos_filter') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(matched) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index 237abb487..cd3995ed9 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -49,7 +49,7 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): if not '.' in tmp: cls._members.append(tmp) - cls._options['bond0'] = [] + cls._options = {'bond0' : []} for member in cls._members: cls._options['bond0'].append(f'member interface {member}') cls._interfaces = list(cls._options) @@ -136,7 +136,7 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): def test_bonding_hash_policy(self): # Define available bonding hash policies - hash_policies = ['layer2', 'layer2+3', 'layer2+3', 'encap2+3', 'encap3+4'] + hash_policies = ['layer2', 'layer2+3', 'encap2+3', 'encap3+4'] for hash_policy in hash_policies: for interface in self._interfaces: for option in self._options.get(interface, []): @@ -151,6 +151,29 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): defined_policy = read_file(f'/sys/class/net/{interface}/bonding/xmit_hash_policy').split() self.assertEqual(defined_policy[0], hash_policy) + def test_bonding_mii_monitoring_interval(self): + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + + self.cli_commit() + + # verify default + for interface in self._interfaces: + tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() + self.assertIn('100', tmp) + + mii_mon = '250' + for interface in self._interfaces: + self.cli_set(self._base_path + [interface, 'mii-mon-interval', mii_mon]) + + self.cli_commit() + + # verify new CLI value + for interface in self._interfaces: + tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() + self.assertIn(mii_mon, tmp) + def test_bonding_multi_use_member(self): # Define available bonding hash policies for interface in ['bond10', 'bond20']: @@ -165,6 +188,46 @@ class BondingInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() + def test_bonding_source_interface(self): + # Re-use member interface that is already a source-interface + bond = 'bond99' + pppoe = 'pppoe98756' + member = next(iter(self._members)) + + self.cli_set(self._base_path + [bond, 'member', 'interface', member]) + self.cli_set(['interfaces', 'pppoe', pppoe, 'source-interface', member]) + + # check validate() - can not add interface to bond, it is the source-interface of ... + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['interfaces', 'pppoe', pppoe]) + self.cli_commit() + + # verify config + slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() + self.assertIn(member, slaves) + + def test_bonding_source_bridge_interface(self): + # Re-use member interface that is already a source-interface + bond = 'bond1097' + bridge = 'br6327' + member = next(iter(self._members)) + + self.cli_set(self._base_path + [bond, 'member', 'interface', member]) + self.cli_set(['interfaces', 'bridge', bridge, 'member', 'interface', member]) + + # check validate() - can not add interface to bond, it is a member of bridge ... + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(['interfaces', 'bridge', bridge]) + self.cli_commit() + + # verify config + slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() + self.assertIn(member, slaves) + def test_bonding_uniq_member_description(self): ethernet_path = ['interfaces', 'ethernet'] for interface in self._interfaces: diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index ca0ead9e8..6d7af78eb 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -19,6 +19,7 @@ import json import unittest from base_interfaces_test import BasicInterfaceTest +from copy import deepcopy from glob import glob from netifaces import interfaces @@ -86,9 +87,83 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): # validate member interface configuration for member in self._members: tmp = get_interface_config(member) + # verify member is assigned to the bridge + self.assertEqual(interface, tmp['master']) # Isolated must be enabled as configured above self.assertTrue(tmp['linkinfo']['info_slave_data']['isolated']) + def test_igmp_querier_snooping(self): + # Add member interfaces to bridge + for interface in self._interfaces: + base = self._base_path + [interface] + + # assign members to bridge interface + for member in self._members: + base_member = base + ['member', 'interface', member] + self.cli_set(base_member) + + # commit config + self.cli_commit() + + for interface in self._interfaces: + # Verify IGMP default configuration + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_snooping') + self.assertEqual(tmp, '0') + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_querier') + self.assertEqual(tmp, '0') + + # Enable IGMP snooping + for interface in self._interfaces: + base = self._base_path + [interface] + self.cli_set(base + ['igmp', 'snooping']) + + # commit config + self.cli_commit() + + for interface in self._interfaces: + # Verify IGMP snooping configuration + # Verify IGMP default configuration + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_snooping') + self.assertEqual(tmp, '1') + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_querier') + self.assertEqual(tmp, '0') + + # Enable IGMP querieer + for interface in self._interfaces: + base = self._base_path + [interface] + self.cli_set(base + ['igmp', 'querier']) + + # commit config + self.cli_commit() + + for interface in self._interfaces: + # Verify IGMP snooping & querier configuration + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_snooping') + self.assertEqual(tmp, '1') + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_querier') + self.assertEqual(tmp, '1') + + # Disable IGMP + for interface in self._interfaces: + base = self._base_path + [interface] + self.cli_delete(base + ['igmp']) + + # commit config + self.cli_commit() + + for interface in self._interfaces: + # Verify IGMP snooping & querier configuration + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_snooping') + self.assertEqual(tmp, '0') + tmp = read_file(f'/sys/class/net/{interface}/bridge/multicast_querier') + self.assertEqual(tmp, '0') + + # validate member interface configuration + for member in self._members: + tmp = get_interface_config(member) + # verify member is assigned to the bridge + self.assertEqual(interface, tmp['master']) + def test_add_remove_bridge_member(self): # Add member interfaces to bridge and set STP cost/priority @@ -150,87 +225,123 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): super().test_vif_8021q_mtu_limits() def test_bridge_vlan_filter(self): - vif_vlan = 2 + vifs = ['10', '20', '30', '40'] + native_vlan = '20' + # Add member interface to bridge and set VLAN filter for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['enable-vlan']) self.cli_set(base + ['address', '192.0.2.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'address', '192.0.3.1/24']) - self.cli_set(base + ['vif', str(vif_vlan), 'mtu', self._mtu]) - vlan_id = 101 - allowed_vlan = 2 - allowed_vlan_range = '4-9' - # assign members to bridge interface + for vif in vifs: + self.cli_set(base + ['vif', vif, 'address', f'192.0.{vif}.1/24']) + self.cli_set(base + ['vif', vif, 'mtu', self._mtu]) + for member in self._members: base_member = base + ['member', 'interface', member] - self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)]) - self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range]) - self.cli_set(base_member + ['native-vlan', str(vlan_id)]) - vlan_id += 1 + self.cli_set(base_member + ['native-vlan', native_vlan]) + for vif in vifs: + self.cli_set(base_member + ['allowed-vlan', vif]) # commit config self.cli_commit() - # Detect the vlan filter function + def _verify_members(interface, members) -> None: + # check member interfaces are added on the bridge + bridge_members = [] + for tmp in glob(f'/sys/class/net/{interface}/lower_*'): + bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + + self.assertListEqual(sorted(members), sorted(bridge_members)) + + def _check_vlan_filter(interface, vifs) -> None: + configured_vlan_ids = [] + + bridge_json = cmd(f'bridge -j vlan show dev {interface}') + bridge_json = json.loads(bridge_json) + self.assertIsNotNone(bridge_json) + + for tmp in bridge_json: + self.assertIn('vlans', tmp) + + for vlan in tmp['vlans']: + self.assertIn('vlan', vlan) + configured_vlan_ids.append(str(vlan['vlan'])) + + # Verify native VLAN ID has 'PVID' flag set on individual member ports + if not interface.startswith('br') and str(vlan['vlan']) == native_vlan: + self.assertIn('flags', vlan) + self.assertIn('PVID', vlan['flags']) + + self.assertListEqual(sorted(configured_vlan_ids), sorted(vifs)) + + # Verify correct setting of VLAN filter function for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bridge/vlan_filtering') self.assertEqual(tmp, '1') - # Execute the program to obtain status information - json_data = cmd('bridge -j vlan show', shell=True) - vlan_filter_status = None - vlan_filter_status = json.loads(json_data) - - if vlan_filter_status is not None: - for interface_status in vlan_filter_status: - ifname = interface_status['ifname'] - for interface in self._members: - vlan_success = 0; - if interface == ifname: - vlans_status = interface_status['vlans'] - for vlan_status in vlans_status: - vlan_id = vlan_status['vlan'] - flag_num = 0 - if 'flags' in vlan_status: - flags = vlan_status['flags'] - for flag in flags: - flag_num = flag_num +1 - if vlan_id == 2: - if flag_num == 0: - vlan_success = vlan_success + 1 - else: - for id in range(4,10): - if vlan_id == id: - if flag_num == 0: - vlan_success = vlan_success + 1 - if vlan_id >= 101: - if flag_num == 2: - vlan_success = vlan_success + 1 - if vlan_success >= 7: - self.assertTrue(True) - else: - self.assertTrue(False) + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) - else: - self.assertTrue(False) + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) - # check member interfaces are added on the bridge + for member in self._members: + _check_vlan_filter(member, vifs) + + # change member interface description to trigger config update, + # VLANs must still exist (T4565) for interface in self._interfaces: - bridge_members = [] - for tmp in glob(f'/sys/class/net/{interface}/lower_*'): - bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + for member in self._members: + self.cli_set(['interfaces', Section.section(member), member, 'description', f'foo {member}']) + + # commit config + self.cli_commit() + + # Obtain status information and verify proper VLAN filter setup. + # First check if all members are present, second check if all VLANs + # are assigned on the parend bridge interface, third verify all the + # VLANs are properly setup on the downstream "member" ports + for interface in self._interfaces: + # check member interfaces are added on the bridge + _verify_members(interface, self._members) + + # Check if all VLAN ids are properly set up. Bridge interface always + # has native VLAN 1 + tmp = deepcopy(vifs) + tmp.append('1') + _check_vlan_filter(interface, tmp) for member in self._members: - self.assertIn(member, bridge_members) + _check_vlan_filter(member, vifs) # delete all members for interface in self._interfaces: self.cli_delete(self._base_path + [interface, 'member']) + # commit config + self.cli_commit() + + # verify member interfaces are no longer assigned on the bridge + for interface in self._interfaces: + bridge_members = [] + for tmp in glob(f'/sys/class/net/{interface}/lower_*'): + bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + + self.assertNotEqual(len(self._members), len(bridge_members)) + for member in self._members: + self.assertNotIn(member, bridge_members) - def test_bridge_vlan_members(self): + def test_bridge_vif_members(self): # T2945: ensure that VIFs are not dropped from bridge vifs = ['300', '400'] for interface in self._interfaces: @@ -255,5 +366,34 @@ class BridgeInterfaceTest(BasicInterfaceTest.TestCase): self.cli_delete(['interfaces', 'ethernet', member, 'vif', vif]) self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) + def test_bridge_vif_s_vif_c_members(self): + # T2945: ensure that VIFs are not dropped from bridge + vifs = ['300', '400'] + vifc = ['301', '401'] + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + for vif_c in vifc: + self.cli_set(['interfaces', 'ethernet', member, 'vif-s', vif_s, 'vif-c', vif_c]) + self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}']) + + self.cli_commit() + + # Verify config + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + for vif_c in vifc: + # member interface must be assigned to the bridge + self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif_s}.{vif_c}')) + + # delete all members + for interface in self._interfaces: + for member in self._members: + for vif_s in vifs: + self.cli_delete(['interfaces', 'ethernet', member, 'vif-s', vif_s]) + for vif_c in vifc: + self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index 05d2ae5f5..ed611062a 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -17,6 +17,7 @@ import os import re import unittest +from glob import glob from netifaces import AF_INET from netifaces import AF_INET6 @@ -119,15 +120,13 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase): cls._base_path = ['interfaces', 'ethernet'] cls._mirror_interfaces = ['dum21354'] - # we need to filter out VLAN interfaces identified by a dot (.) - # in their name - just in case! + # We only test on physical interfaces and not VLAN (sub-)interfaces if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: - for tmp in Section.interfaces('ethernet'): - if not '.' in tmp: - cls._interfaces.append(tmp) + for tmp in Section.interfaces('ethernet', vlan=False): + cls._interfaces.append(tmp) cls._macs = {} for interface in cls._interfaces: @@ -185,6 +184,39 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase): self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}') + def test_offloading_rfs(self): + global_rfs_flow = 32768 + rfs_flow = global_rfs_flow + + for interface in self._interfaces: + self.cli_set(self._base_path + [interface, 'offload', 'rfs']) + + self.cli_commit() + + for interface in self._interfaces: + queues = len(glob(f'/sys/class/net/{interface}/queues/rx-*')) + rfs_flow = int(global_rfs_flow/queues) + for i in range(0, queues): + tmp = read_file(f'/sys/class/net/{interface}/queues/rx-{i}/rps_flow_cnt') + self.assertEqual(int(tmp), rfs_flow) + + tmp = read_file(f'/proc/sys/net/core/rps_sock_flow_entries') + self.assertEqual(int(tmp), global_rfs_flow) + + # delete configuration of RFS and check all values returned to default "0" + for interface in self._interfaces: + self.cli_delete(self._base_path + [interface, 'offload', 'rfs']) + + self.cli_commit() + + for interface in self._interfaces: + queues = len(glob(f'/sys/class/net/{interface}/queues/rx-*')) + rfs_flow = int(global_rfs_flow/queues) + for i in range(0, queues): + tmp = read_file(f'/sys/class/net/{interface}/queues/rx-{i}/rps_flow_cnt') + self.assertEqual(int(tmp), 0) + + def test_non_existing_interface(self): unknonw_interface = self._base_path + ['eth667'] self.cli_set(unknonw_interface) diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index e5e5a558e..f141cc6d3 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -28,6 +28,8 @@ from vyos.util import read_file from vyos.util import get_interface_config from vyos.util import process_named_running +PROCESS_NAME = 'wpa_supplicant' + def get_config_value(interface, key): tmp = read_file(f'/run/wpa_supplicant/{interface}.conf') tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) @@ -55,6 +57,10 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # call base-classes classmethod super(MACsecInterfaceTest, cls).setUpClass() + def tearDown(self): + super().tearDown() + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_macsec_encryption(self): # MACsec can be operating in authentication and encryption mode - both # using different mandatory settings, lets test encryption as the basic @@ -96,28 +102,29 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() tmp = get_config_value(src_interface, 'macsec_integ_only') - self.assertTrue("0" in tmp) + self.assertIn("0", tmp) tmp = get_config_value(src_interface, 'mka_cak') - self.assertTrue(mak_cak in tmp) + self.assertIn(mak_cak, tmp) tmp = get_config_value(src_interface, 'mka_ckn') - self.assertTrue(mak_ckn in tmp) + self.assertIn(mak_ckn, tmp) # check that the default priority of 255 is programmed tmp = get_config_value(src_interface, 'mka_priority') - self.assertTrue("255" in tmp) + self.assertIn("255", tmp) tmp = get_config_value(src_interface, 'macsec_replay_window') - self.assertTrue(replay_window in tmp) + self.assertIn(replay_window, tmp) tmp = read_file(f'/sys/class/net/{interface}/mtu') self.assertEqual(tmp, '1460') - # Check for running process - self.assertTrue(process_named_running('wpa_supplicant')) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) def test_macsec_gcm_aes_128(self): + src_interface = 'eth0' interface = 'macsec1' cipher = 'gcm-aes-128' self.cli_set(self._base_path + [interface]) @@ -125,7 +132,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): @@ -138,7 +145,15 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) + # check that we use the new macsec_csindex option (T4537) + tmp = get_config_value(src_interface, 'macsec_csindex') + self.assertIn("0", tmp) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + def test_macsec_gcm_aes_256(self): + src_interface = 'eth0' interface = 'macsec4' cipher = 'gcm-aes-256' self.cli_set(self._base_path + [interface]) @@ -146,7 +161,7 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): @@ -158,6 +173,13 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) + # check that we use the new macsec_csindex option (T4537) + tmp = get_config_value(src_interface, 'macsec_csindex') + self.assertIn("1", tmp) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + def test_macsec_source_interface(self): # Ensure source-interface can bot be part of any other bond or bridge @@ -186,6 +208,9 @@ class MACsecInterfaceTest(BasicInterfaceTest.TestCase): self.cli_commit() self.assertIn(interface, interfaces()) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2, failfast=True) diff --git a/smoketest/scripts/cli/test_interfaces_virtual_ethernet.py b/smoketest/scripts/cli/test_interfaces_virtual_ethernet.py new file mode 100755 index 000000000..4732342fc --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_virtual_ethernet.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.ifconfig import Section +from base_interfaces_test import BasicInterfaceTest + +class VEthInterfaceTest(BasicInterfaceTest.TestCase): + @classmethod + def setUpClass(cls): + cls._test_dhcp = True + cls._base_path = ['interfaces', 'virtual-ethernet'] + + cls._options = { + 'veth0': ['peer-name veth1'], + 'veth1': ['peer-name veth0'], + } + + cls._interfaces = list(cls._options) + # call base-classes classmethod + super(VEthInterfaceTest, cls).setUpClass() + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_vti.py b/smoketest/scripts/cli/test_interfaces_vti.py new file mode 100755 index 000000000..9cbf104f0 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_vti.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest + +from base_interfaces_test import BasicInterfaceTest + +class VTIInterfaceTest(BasicInterfaceTest.TestCase): + @classmethod + def setUpClass(cls): + cls._test_ip = True + cls._test_ipv6 = True + cls._test_mtu = True + cls._base_path = ['interfaces', 'vti'] + cls._interfaces = ['vti10', 'vti20', 'vti30'] + + # call base-classes classmethod + super(VTIInterfaceTest, cls).setUpClass() + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_wireguard.py b/smoketest/scripts/cli/test_interfaces_wireguard.py index f3e9670f7..14fc8d109 100755 --- a/smoketest/scripts/cli/test_interfaces_wireguard.py +++ b/smoketest/scripts/cli/test_interfaces_wireguard.py @@ -62,10 +62,10 @@ class WireGuardInterfaceTest(VyOSUnitTestSHIM.TestCase): self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) - def test_wireguard_add_remove_peer(self): # T2939: Create WireGuard interfaces with associated peers. # Remove one of the configured peers. + # T4774: Test prevention of duplicate peer public keys interface = 'wg0' port = '12345' privkey = '6ISOkASm6VhHOOSz/5iIxw+Q9adq9zA17iMM4X40dlc=' @@ -80,11 +80,17 @@ class WireGuardInterfaceTest(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32']) self.cli_set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1']) - self.cli_set(base_path + [interface, 'peer', 'PEER02', 'public-key', pubkey_2]) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'public-key', pubkey_1]) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'port', port]) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32']) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2']) + # Duplicate pubkey_1 + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'public-key', pubkey_2]) + # Commit peers self.cli_commit() diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py index 8e54f66a3..23020b9b1 100755 --- a/smoketest/scripts/cli/test_load_balancning_wan.py +++ b/smoketest/scripts/cli/test_load_balancning_wan.py @@ -64,28 +64,39 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): ns1 = 'ns201' ns2 = 'ns202' + ns3 = 'ns203' iface1 = 'eth201' iface2 = 'eth202' + iface3 = 'eth203' container_iface1 = 'ceth0' container_iface2 = 'ceth1' + container_iface3 = 'ceth2' # Create network namespeces create_netns(ns1) create_netns(ns2) + create_netns(ns3) create_veth_pair(iface1, container_iface1) create_veth_pair(iface2, container_iface2) + create_veth_pair(iface3, container_iface3) move_interface_to_netns(container_iface1, ns1) move_interface_to_netns(container_iface2, ns2) + move_interface_to_netns(container_iface3, ns3) call(f'sudo ip address add 203.0.113.10/24 dev {iface1}') call(f'sudo ip address add 192.0.2.10/24 dev {iface2}') + call(f'sudo ip address add 198.51.100.10/24 dev {iface3}') call(f'sudo ip link set dev {iface1} up') call(f'sudo ip link set dev {iface2} up') + call(f'sudo ip link set dev {iface3} up') cmd_in_netns(ns1, f'ip link set {container_iface1} name eth0') cmd_in_netns(ns2, f'ip link set {container_iface2} name eth0') + cmd_in_netns(ns3, f'ip link set {container_iface3} name eth0') cmd_in_netns(ns1, 'ip address add 203.0.113.1/24 dev eth0') cmd_in_netns(ns2, 'ip address add 192.0.2.1/24 dev eth0') + cmd_in_netns(ns3, 'ip address add 198.51.100.1/24 dev eth0') cmd_in_netns(ns1, 'ip link set dev eth0 up') cmd_in_netns(ns2, 'ip link set dev eth0 up') + cmd_in_netns(ns3, 'ip link set dev eth0 up') # Set load-balancing configuration self.cli_set(base_path + ['wan', 'interface-health', iface1, 'failure-count', '2']) @@ -95,6 +106,10 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['wan', 'interface-health', iface2, 'nexthop', '192.0.2.1']) self.cli_set(base_path + ['wan', 'interface-health', iface2, 'success-count', '1']) + self.cli_set(base_path + ['wan', 'rule', '10', 'inbound-interface', iface3]) + self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'address', '198.51.100.0/24']) + + # commit changes self.cli_commit() @@ -120,10 +135,13 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): ns1 = 'nsA' ns2 = 'nsB' + ns3 = 'nsC' iface1 = 'veth1' iface2 = 'veth2' + iface3 = 'veth3' container_iface1 = 'ceth0' container_iface2 = 'ceth1' + container_iface3 = 'ceth2' mangle_isp1 = """table ip mangle { chain ISP_veth1 { counter ct mark set 0xc9 @@ -138,24 +156,58 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): counter accept } }""" + mangle_prerouting = """table ip mangle { + chain PREROUTING { + type filter hook prerouting priority mangle; policy accept; + counter jump WANLOADBALANCE_PRE + } +}""" + mangle_wanloadbalance_pre = """table ip mangle { + chain WANLOADBALANCE_PRE { + iifname "veth3" ip saddr 198.51.100.0/24 ct state new counter jump ISP_veth1 + iifname "veth3" ip saddr 198.51.100.0/24 ct state new counter jump ISP_veth2 + iifname "veth3" ip saddr 198.51.100.0/24 counter meta mark set ct mark + } +}""" + nat_wanloadbalance = """table ip nat { + chain WANLOADBALANCE { + ct mark 0xc9 counter snat to 203.0.113.10 + ct mark 0xca counter snat to 192.0.2.10 + } +}""" + nat_vyos_pre_snat_hook = """table ip nat { + chain VYOS_PRE_SNAT_HOOK { + type nat hook postrouting priority srcnat - 1; policy accept; + counter jump WANLOADBALANCE + return + } +}""" # Create network namespeces create_netns(ns1) create_netns(ns2) + create_netns(ns3) create_veth_pair(iface1, container_iface1) create_veth_pair(iface2, container_iface2) + create_veth_pair(iface3, container_iface3) move_interface_to_netns(container_iface1, ns1) move_interface_to_netns(container_iface2, ns2) + move_interface_to_netns(container_iface3, ns3) call(f'sudo ip address add 203.0.113.10/24 dev {iface1}') call(f'sudo ip address add 192.0.2.10/24 dev {iface2}') + call(f'sudo ip address add 198.51.100.10/24 dev {iface3}') call(f'sudo ip link set dev {iface1} up') call(f'sudo ip link set dev {iface2} up') + call(f'sudo ip link set dev {iface3} up') cmd_in_netns(ns1, f'ip link set {container_iface1} name eth0') cmd_in_netns(ns2, f'ip link set {container_iface2} name eth0') + cmd_in_netns(ns3, f'ip link set {container_iface3} name eth0') cmd_in_netns(ns1, 'ip address add 203.0.113.1/24 dev eth0') cmd_in_netns(ns2, 'ip address add 192.0.2.1/24 dev eth0') + cmd_in_netns(ns3, 'ip address add 198.51.100.1/24 dev eth0') cmd_in_netns(ns1, 'ip link set dev eth0 up') cmd_in_netns(ns2, 'ip link set dev eth0 up') + cmd_in_netns(ns3, 'ip link set dev eth0 up') # Set load-balancing configuration self.cli_set(base_path + ['wan', 'interface-health', iface1, 'failure-count', '2']) @@ -164,19 +216,36 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['wan', 'interface-health', iface2, 'failure-count', '2']) self.cli_set(base_path + ['wan', 'interface-health', iface2, 'nexthop', '192.0.2.1']) self.cli_set(base_path + ['wan', 'interface-health', iface2, 'success-count', '1']) + self.cli_set(base_path + ['wan', 'rule', '10', 'inbound-interface', iface3]) + self.cli_set(base_path + ['wan', 'rule', '10', 'source', 'address', '198.51.100.0/24']) + self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface1]) + self.cli_set(base_path + ['wan', 'rule', '10', 'interface', iface2]) # commit changes self.cli_commit() time.sleep(5) - # Check chains - #call('sudo nft list ruleset') + + # Check mangle chains tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface1}') self.assertEqual(tmp, mangle_isp1) tmp = cmd(f'sudo nft -s list chain mangle ISP_{iface2}') self.assertEqual(tmp, mangle_isp2) + tmp = cmd(f'sudo nft -s list chain mangle PREROUTING') + self.assertEqual(tmp, mangle_prerouting) + + tmp = cmd(f'sudo nft -s list chain mangle WANLOADBALANCE_PRE') + self.assertEqual(tmp, mangle_wanloadbalance_pre) + + # Check nat chains + tmp = cmd(f'sudo nft -s list chain nat WANLOADBALANCE') + self.assertEqual(tmp, nat_wanloadbalance) + + tmp = cmd(f'sudo nft -s list chain nat VYOS_PRE_SNAT_HOOK') + self.assertEqual(tmp, nat_vyos_pre_snat_hook) + # Delete veth interfaces and netns for iface in [iface1, iface2]: call(f'sudo ip link del dev {iface}') diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 408facfb3..9f4e3b831 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -16,6 +16,7 @@ import jmespath import json +import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM @@ -26,6 +27,10 @@ from vyos.util import dict_search base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] +static_path = base_path + ['static'] + +nftables_nat_config = '/run/nftables_nat.conf' +nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' class TestNAT(VyOSUnitTestSHIM.TestCase): @classmethod @@ -39,11 +44,38 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): def tearDown(self): self.cli_delete(base_path) self.cli_commit() + self.assertFalse(os.path.exists(nftables_nat_config)) + self.assertFalse(os.path.exists(nftables_static_nat_conf)) + + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): + # Resolver no longer blocks commit, need to wait for daemon to populate set + count = 0 + while count < max_wait: + code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') + if code == 0: + return True + count += 1 + sleep(1) + return False def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] outbound_iface_100 = 'eth0' outbound_iface_200 = 'eth1' + + nftables_search = ['jump VYOS_PRE_SNAT_HOOK'] + for rule in rules: network = f'192.168.{rule}.0/24' # depending of rule order we check either for source address for NAT @@ -52,51 +84,40 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade']) else: self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) self.cli_set(src_path + ['rule', rule, 'exclude']) + nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return']) self.cli_commit() - tmp = cmd('sudo nft -j list chain ip nat POSTROUTING') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + self.verify_nftables(nftables_search, 'ip vyos_nat') - for idx in range(0, len(data_json)): - data = data_json[idx] - if idx == 0: - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') + def test_snat_groups(self): + address_group = 'smoketest_addr' + address_group_member = '192.0.2.1' + rule = '100' + outbound_iface = 'eth0' - jump_target = dict_search('jump.target', data['expr'][1]) - self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK') - else: - rule = str(rules[idx - 1]) - network = f'192.168.{rule}.0/24' - - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['comment'], f'SRC-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - address = dict_search('match.right.prefix.addr', data['expr'][1]) - mask = dict_search('match.right.prefix.len', data['expr'][1]) - - if int(rule) < 200: - self.assertEqual(direction, 'saddr') - self.assertEqual(iface, outbound_iface_100) - # check for masquerade keyword - self.assertIn('masquerade', data['expr'][3]) - else: - self.assertEqual(direction, 'daddr') - self.assertEqual(iface, outbound_iface_200) - # check for return keyword due to 'exclude' - self.assertIn('return', data['expr'][3]) - - self.assertEqual(f'{address}/{mask}', network) + self.cli_set(['firewall', 'group', 'address-group', address_group, 'address', address_group_member]) + + self.cli_set(src_path + ['rule', rule, 'source', 'group', 'address-group', address_group]) + self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface]) + self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + + self.cli_commit() + + nftables_search = [ + [f'set A_{address_group}'], + [f'elements = {{ {address_group_member} }}'], + [f'ip saddr @A_{address_group}', f'oifname "{outbound_iface}"', 'masquerade'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_nat') + + self.cli_delete(['firewall']) def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] @@ -105,56 +126,29 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): inbound_proto_100 = 'udp' inbound_proto_200 = 'tcp' + nftables_search = ['jump VYOS_PRE_DNAT_HOOK'] + for rule in rules: port = f'10{rule}' self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) + rule_search = [f'dnat to 192.0.2.1:{port}'] if int(rule) < 200: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) + rule_search.append(f'{inbound_proto_100} sport {port}') + rule_search.append(f'iifname "{inbound_iface_100}"') else: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) + rule_search.append(f'iifname "{inbound_iface_200}"') - self.cli_commit() - - tmp = cmd('sudo nft -j list chain ip nat PREROUTING') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + nftables_search.append(rule_search) - for idx in range(0, len(data_json)): - data = data_json[idx] - if idx == 0: - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - jump_target = dict_search('jump.target', data['expr'][1]) - self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK') - else: + self.cli_commit() - rule = str(rules[idx - 1]) - port = int(f'10{rule}') - - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - protocol = dict_search('match.left.payload.protocol', data['expr'][1]) - dnat_addr = dict_search('dnat.addr', data['expr'][3]) - dnat_port = dict_search('dnat.port', data['expr'][3]) - - self.assertEqual(direction, 'sport') - self.assertEqual(dnat_addr, '192.0.2.1') - self.assertEqual(dnat_port, port) - if int(rule) < 200: - self.assertEqual(iface, inbound_iface_100) - self.assertEqual(protocol, inbound_proto_100) - else: - self.assertEqual(iface, inbound_iface_200) + self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified @@ -193,8 +187,48 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): # without any rule self.cli_set(src_path) self.cli_set(dst_path) + self.cli_set(static_path) + self.cli_commit() + + def test_dnat_without_translation_address(self): + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) + self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) + + self.cli_commit() + + nftables_search = [ + ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_nat') + + def test_static_nat(self): + dst_addr_1 = '10.0.1.1' + translate_addr_1 = '192.168.1.1' + dst_addr_2 = '203.0.113.0/24' + translate_addr_2 = '192.0.2.0/24' + ifname = 'eth0' + + self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1]) + self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname]) + self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1]) + + self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2]) + self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname]) + self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2]) + self.cli_commit() + nftables_search = [ + [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'], + [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'], + [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'], + [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_static_nat') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index aac6a30f9..50806b3e8 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -42,6 +42,17 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) self.cli_commit() + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + def test_source_nat66(self): source_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' @@ -49,29 +60,23 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_prefix]) - # check validate() - outbound-interface must be defined - self.cli_commit() + self.cli_set(src_path + ['rule', '2', 'outbound-interface', 'eth1']) + self.cli_set(src_path + ['rule', '2', 'source', 'prefix', source_prefix]) + self.cli_set(src_path + ['rule', '2', 'translation', 'address', 'masquerade']) - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + self.cli_set(src_path + ['rule', '3', 'outbound-interface', 'eth1']) + self.cli_set(src_path + ['rule', '3', 'source', 'prefix', source_prefix]) + self.cli_set(src_path + ['rule', '3', 'exclude']) - for idx in range(0, len(data_json)): - data = data_json[idx] - - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') + self.cli_commit() - iface = dict_search('match.right', data['expr'][0]) - address = dict_search('match.right.prefix.addr', data['expr'][2]) - mask = dict_search('match.right.prefix.len', data['expr'][2]) - translation_address = dict_search('snat.addr.prefix.addr', data['expr'][3]) - translation_mask = dict_search('snat.addr.prefix.len', data['expr'][3]) + nftables_search = [ + ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'], + ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'], + ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return'] + ] - self.assertEqual(iface, 'eth1') - # check for translation address - self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix) - self.assertEqual(f'{address}/{mask}', source_prefix) + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_address(self): source_prefix = 'fc00::/64' @@ -83,51 +88,58 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): # check validate() - outbound-interface must be defined self.cli_commit() - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) - - for idx in range(0, len(data_json)): - data = data_json[idx] + nftables_search = [ + ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}'] + ] - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - address = dict_search('match.right.prefix.addr', data['expr'][2]) - mask = dict_search('match.right.prefix.len', data['expr'][2]) - snat_address = dict_search('snat.addr', data['expr'][3]) - - self.assertEqual(iface, 'eth1') - # check for translation address - self.assertEqual(snat_address, translation_address) - self.assertEqual(f'{address}/{mask}', source_prefix) + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66(self): destination_address = 'fc00::1' translation_address = 'fc01::1' + source_address = 'fc02::1' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_address]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) + self.cli_set(dst_path + ['rule', '2', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '2', 'destination', 'address', destination_address]) + self.cli_set(dst_path + ['rule', '2', 'source', 'address', source_address]) + self.cli_set(dst_path + ['rule', '2', 'exclude']) + # check validate() - outbound-interface must be defined self.cli_commit() - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + nftables_search = [ + ['iifname "eth1"', 'ip6 daddr fc00::1', 'dnat to fc01::1'], + ['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return'] + ] - for idx in range(0, len(data_json)): - data = data_json[idx] + self.verify_nftables(nftables_search, 'ip6 vyos_nat') - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') + def test_destination_nat66_protocol(self): + translation_address = '2001:db8:1111::1' + source_prefix = '2001:db8:2222::/64' + dport = '4545' + sport = '8080' + tport = '5555' + proto = 'tcp' + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dport]) + self.cli_set(dst_path + ['rule', '1', 'source', 'address', source_prefix]) + self.cli_set(dst_path + ['rule', '1', 'source', 'port', sport]) + self.cli_set(dst_path + ['rule', '1', 'protocol', proto]) + self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', tport]) - iface = dict_search('match.right', data['expr'][0]) - dnat_addr = dict_search('dnat.addr', data['expr'][3]) + # check validate() - outbound-interface must be defined + self.cli_commit() - self.assertEqual(dnat_addr, translation_address) - self.assertEqual(iface, 'eth1') + nftables_search = [ + ['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to [2001:db8:1111::1]:5555'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_prefix(self): destination_prefix = 'fc00::/64' @@ -139,22 +151,25 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): # check validate() - outbound-interface must be defined self.cli_commit() - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + nftables_search = [ + ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_nat') - for idx in range(0, len(data_json)): - data = data_json[idx] + def test_destination_nat66_without_translation_address(self): + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) + self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') + self.cli_commit() - iface = dict_search('match.right', data['expr'][0]) - translation_address = dict_search('dnat.addr.prefix.addr', data['expr'][3]) - translation_mask = dict_search('dnat.addr.prefix.len', data['expr'][3]) + nftables_search = [ + ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] + ] - self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix) - self.assertEqual(iface, 'eth1') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_required_translation_prefix(self): # T2813: Ensure translation address is specified @@ -174,6 +189,30 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() + def test_source_nat66_protocol(self): + translation_address = '2001:db8:1111::1' + source_prefix = '2001:db8:2222::/64' + dport = '9999' + sport = '8080' + tport = '80' + proto = 'tcp' + self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'eth1']) + self.cli_set(src_path + ['rule', '1', 'destination', 'port', dport]) + self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) + self.cli_set(src_path + ['rule', '1', 'source', 'port', sport]) + self.cli_set(src_path + ['rule', '1', 'protocol', proto]) + self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) + self.cli_set(src_path + ['rule', '1', 'translation', 'port', tport]) + + # check validate() - outbound-interface must be defined + self.cli_commit() + + nftables_search = [ + ['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to [2001:db8:1111::1]:80'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_nat') + def test_nat66_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. diff --git a/smoketest/scripts/cli/test_pki.py b/smoketest/scripts/cli/test_pki.py index cba5ffdde..b18b0b039 100755 --- a/smoketest/scripts/cli/test_pki.py +++ b/smoketest/scripts/cli/test_pki.py @@ -246,5 +246,27 @@ class TestPKI(VyOSUnitTestSHIM.TestCase): self.cli_delete(['service', 'https', 'certificates', 'certificate']) + def test_certificate_eapol_update(self): + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_ca_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_ca_private_key.replace('\n','')]) + self.cli_commit() + + self.cli_set(['interfaces', 'ethernet', 'eth1', 'eapol', 'certificate', 'smoketest']) + self.cli_commit() + + cert_data = None + + with open('/run/wpa_supplicant/eth1_cert.pem') as f: + cert_data = f.read() + + self.cli_set(base_path + ['certificate', 'smoketest', 'certificate', valid_update_cert.replace('\n','')]) + self.cli_set(base_path + ['certificate', 'smoketest', 'private', 'key', valid_update_private_key.replace('\n','')]) + self.cli_commit() + + with open('/run/wpa_supplicant/eth1_cert.pem') as f: + self.assertNotEqual(cert_data, f.read()) + + self.cli_delete(['interfaces', 'ethernet', 'eth1', 'eapol']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py index f175d7df7..3a4ef666a 100755 --- a/smoketest/scripts/cli/test_policy.py +++ b/smoketest/scripts/cli/test_policy.py @@ -698,6 +698,184 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): for rule in test_range: tmp = f'ip prefix-list {prefix_list} seq {rule} permit {prefix} le {rule}' self.assertIn(tmp, config) + def test_route_map_community_set(self): + test_data = { + "community-configuration": { + "rule": { + "10": { + "action": "permit", + "set": { + "community": { + "replace": [ + "65000:10", + "65001:11" + ] + }, + "extcommunity": { + "bandwidth": "200", + "rt": [ + "65000:10", + "192.168.0.1:11" + ], + "soo": [ + "192.168.0.1:11", + "65000:10" + ] + }, + "large-community": { + "replace": [ + "65000:65000:10", + "65000:65000:11" + ] + } + } + }, + "20": { + "action": "permit", + "set": { + "community": { + "add": [ + "65000:10", + "65001:11" + ] + }, + "extcommunity": { + "bandwidth": "200", + "bandwidth-non-transitive": {} + }, + "large-community": { + "add": [ + "65000:65000:10", + "65000:65000:11" + ] + } + } + }, + "30": { + "action": "permit", + "set": { + "community": { + "none": {} + }, + "extcommunity": { + "none": {} + }, + "large-community": { + "none": {} + } + } + } + } + } + } + for route_map, route_map_config in test_data.items(): + path = base_path + ['route-map', route_map] + self.cli_set(path + ['description', f'VyOS ROUTE-MAP {route_map}']) + if 'rule' not in route_map_config: + continue + + for rule, rule_config in route_map_config['rule'].items(): + if 'action' in rule_config: + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) + if 'set' in rule_config: + + #Add community in configuration + if 'community' in rule_config['set']: + if 'none' in rule_config['set']['community']: + self.cli_set(path + ['rule', rule, 'set', 'community', 'none']) + else: + community_path = path + ['rule', rule, 'set', 'community'] + if 'add' in rule_config['set']['community']: + for community_unit in rule_config['set']['community']['add']: + self.cli_set(community_path + ['add', community_unit]) + if 'replace' in rule_config['set']['community']: + for community_unit in rule_config['set']['community']['replace']: + self.cli_set(community_path + ['replace', community_unit]) + + #Add large-community in configuration + if 'large-community' in rule_config['set']: + if 'none' in rule_config['set']['large-community']: + self.cli_set(path + ['rule', rule, 'set', 'large-community', 'none']) + else: + community_path = path + ['rule', rule, 'set', 'large-community'] + if 'add' in rule_config['set']['large-community']: + for community_unit in rule_config['set']['large-community']['add']: + self.cli_set(community_path + ['add', community_unit]) + if 'replace' in rule_config['set']['large-community']: + for community_unit in rule_config['set']['large-community']['replace']: + self.cli_set(community_path + ['replace', community_unit]) + + #Add extcommunity in configuration + if 'extcommunity' in rule_config['set']: + if 'none' in rule_config['set']['extcommunity']: + self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'none']) + else: + if 'bandwidth' in rule_config['set']['extcommunity']: + self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'bandwidth', rule_config['set']['extcommunity']['bandwidth']]) + if 'bandwidth-non-transitive' in rule_config['set']['extcommunity']: + self.cli_set(path + ['rule', rule, 'set','extcommunity', 'bandwidth-non-transitive']) + if 'rt' in rule_config['set']['extcommunity']: + for community_unit in rule_config['set']['extcommunity']['rt']: + self.cli_set(path + ['rule', rule, 'set', 'extcommunity','rt',community_unit]) + if 'soo' in rule_config['set']['extcommunity']: + for community_unit in rule_config['set']['extcommunity']['soo']: + self.cli_set(path + ['rule', rule, 'set', 'extcommunity','soo',community_unit]) + self.cli_commit() + + for route_map, route_map_config in test_data.items(): + if 'rule' not in route_map_config: + continue + for rule, rule_config in route_map_config['rule'].items(): + name = f'route-map {route_map} {rule_config["action"]} {rule}' + config = self.getFRRconfig(name) + self.assertIn(name, config) + + if 'set' in rule_config: + #Check community + if 'community' in rule_config['set']: + if 'none' in rule_config['set']['community']: + tmp = f'set community none' + self.assertIn(tmp, config) + if 'replace' in rule_config['set']['community']: + values = ' '.join(rule_config['set']['community']['replace']) + tmp = f'set community {values}' + self.assertIn(tmp, config) + if 'add' in rule_config['set']['community']: + values = ' '.join(rule_config['set']['community']['add']) + tmp = f'set community {values} additive' + self.assertIn(tmp, config) + #Check large-community + if 'large-community' in rule_config['set']: + if 'none' in rule_config['set']['large-community']: + tmp = f'set large-community none' + self.assertIn(tmp, config) + if 'replace' in rule_config['set']['large-community']: + values = ' '.join(rule_config['set']['large-community']['replace']) + tmp = f'set large-community {values}' + self.assertIn(tmp, config) + if 'add' in rule_config['set']['large-community']: + values = ' '.join(rule_config['set']['large-community']['add']) + tmp = f'set large-community {values} additive' + self.assertIn(tmp, config) + #Check extcommunity + if 'extcommunity' in rule_config['set']: + if 'none' in rule_config['set']['extcommunity']: + tmp = 'set extcommunity none' + self.assertIn(tmp, config) + if 'bandwidth' in rule_config['set']['extcommunity']: + values = rule_config['set']['extcommunity']['bandwidth'] + tmp = f'set extcommunity bandwidth {values}' + if 'bandwidth-non-transitive' in rule_config['set']['extcommunity']: + tmp = tmp + ' non-transitive' + self.assertIn(tmp, config) + if 'rt' in rule_config['set']['extcommunity']: + values = ' '.join(rule_config['set']['extcommunity']['rt']) + tmp = f'set extcommunity rt {values}' + self.assertIn(tmp, config) + if 'soo' in rule_config['set']['extcommunity']: + values = ' '.join(rule_config['set']['extcommunity']['soo']) + tmp = f'set extcommunity soo {values}' + self.assertIn(tmp, config) def test_route_map(self): access_list = '50' @@ -715,6 +893,7 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): local_pref = '300' metric = '50' peer = '2.3.4.5' + peerv6 = '2001:db8::1' tag = '6542' goto = '25' @@ -723,7 +902,6 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): ipv6_prefix_len= '122' ipv4_nexthop_type= 'blackhole' ipv6_nexthop_type= 'blackhole' - test_data = { 'foo-map-bar' : { @@ -804,6 +982,14 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): 'peer' : peer, }, }, + + '31' : { + 'action' : 'permit', + 'match' : { + 'peer' : peerv6, + }, + }, + '40' : { 'action' : 'permit', 'match' : { @@ -837,17 +1023,14 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): 'as-path-prepend-last-as' : '5', 'atomic-aggregate' : '', 'distance' : '110', - 'extcommunity-bw' : '20000', - 'extcommunity-rt' : '123:456', - 'extcommunity-soo' : '456:789', 'ipv6-next-hop-global' : '2001::1', 'ipv6-next-hop-local' : 'fe80::1', 'ip-next-hop' : '192.168.1.1', - 'large-community' : '100:200:300', 'local-preference' : '500', 'metric' : '150', 'metric-type' : 'type-1', 'origin' : 'incomplete', + 'l3vpn' : '', 'originator-id' : '172.16.10.1', 'src' : '100.0.0.1', 'tag' : '65530', @@ -888,6 +1071,28 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): }, }, }, + 'relative-metric' : { + 'rule' : { + '10' : { + 'action' : 'permit', + 'match' : { + 'ip-nexthop-addr' : ipv4_nexthop_address, + }, + 'set' : { + 'metric' : '+10', + }, + }, + '20' : { + 'action' : 'permit', + 'match' : { + 'ip-nexthop-addr' : ipv4_nexthop_address, + }, + 'set' : { + 'metric' : '-20', + }, + }, + }, + }, } self.cli_set(['policy', 'access-list', access_list, 'rule', '10', 'action', 'permit']) @@ -1019,20 +1224,14 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): self.cli_set(path + ['rule', rule, 'set', 'atomic-aggregate']) if 'distance' in rule_config['set']: self.cli_set(path + ['rule', rule, 'set', 'distance', rule_config['set']['distance']]) - if 'extcommunity-bw' in rule_config['set']: - self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'bandwidth', rule_config['set']['extcommunity-bw']]) - if 'extcommunity-rt' in rule_config['set']: - self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'rt', rule_config['set']['extcommunity-rt']]) - if 'extcommunity-soo' in rule_config['set']: - self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'soo', rule_config['set']['extcommunity-soo']]) if 'ipv6-next-hop-global' in rule_config['set']: self.cli_set(path + ['rule', rule, 'set', 'ipv6-next-hop', 'global', rule_config['set']['ipv6-next-hop-global']]) if 'ipv6-next-hop-local' in rule_config['set']: self.cli_set(path + ['rule', rule, 'set', 'ipv6-next-hop', 'local', rule_config['set']['ipv6-next-hop-local']]) if 'ip-next-hop' in rule_config['set']: self.cli_set(path + ['rule', rule, 'set', 'ip-next-hop', rule_config['set']['ip-next-hop']]) - if 'large-community' in rule_config['set']: - self.cli_set(path + ['rule', rule, 'set', 'large-community', rule_config['set']['large-community']]) + if 'l3vpn' in rule_config['set']: + self.cli_set(path + ['rule', rule, 'set', 'l3vpn-nexthop', 'encapsulation', 'gre']) if 'local-preference' in rule_config['set']: self.cli_set(path + ['rule', rule, 'set', 'local-preference', rule_config['set']['local-preference']]) if 'metric' in rule_config['set']: @@ -1206,20 +1405,14 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase): tmp += 'atomic-aggregate' elif 'distance' in rule_config['set']: tmp += 'distance ' + rule_config['set']['distance'] - elif 'extcommunity-bw' in rule_config['set']: - tmp += 'extcommunity bandwidth' + rule_config['set']['extcommunity-bw'] - elif 'extcommunity-rt' in rule_config['set']: - tmp += 'extcommunity rt' + rule_config['set']['extcommunity-rt'] - elif 'extcommunity-soo' in rule_config['set']: - tmp += 'extcommunity rt' + rule_config['set']['extcommunity-soo'] elif 'ip-next-hop' in rule_config['set']: tmp += 'ip next-hop ' + rule_config['set']['ip-next-hop'] elif 'ipv6-next-hop-global' in rule_config['set']: tmp += 'ipv6 next-hop global ' + rule_config['set']['ipv6-next-hop-global'] elif 'ipv6-next-hop-local' in rule_config['set']: tmp += 'ipv6 next-hop local ' + rule_config['set']['ipv6-next-hop-local'] - elif 'large-community' in rule_config['set']: - tmp += 'large-community ' + rule_config['set']['large-community'] + elif 'l3vpn' in rule_config['set']: + tmp += 'l3vpn next-hop encapsulation gre' elif 'local-preference' in rule_config['set']: tmp += 'local-preference ' + rule_config['set']['local-preference'] elif 'metric' in rule_config['set']: diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py index e2d70f289..11b3c678e 100755 --- a/smoketest/scripts/cli/test_policy_route.py +++ b/smoketest/scripts/cli/test_policy_route.py @@ -42,17 +42,74 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): super(TestPolicyRoute, cls).tearDownClass() def tearDown(self): - self.cli_delete(['interfaces', 'ethernet', interface, 'policy']) self.cli_delete(['policy', 'route']) self.cli_delete(['policy', 'route6']) self.cli_commit() + # Verify nftables cleanup + nftables_search = [ + ['set N_smoketest_network'], + ['set N_smoketest_network1'], + ['chain VYOS_PBR_smoketest'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_mangle', inverse=True) + + # Verify ip rule cleanup + ip_rule_search = [ + ['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id] + ] + + self.verify_rules(ip_rule_search, inverse=True) + + def verify_nftables(self, nftables_search, table, inverse=False): + nftables_output = cmd(f'sudo nft list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def verify_rules(self, rules_search, inverse=False): + rule_output = cmd('ip rule show') + + for search in rules_search: + matched = False + for line in rule_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def test_pbr_group(self): + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) + + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'group', 'network-group', 'smoketest_network1']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) + self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) + + self.cli_commit() + + nftables_search = [ + [f'iifname "{interface}"','jump VYOS_PBR_smoketest'], + ['ip daddr @N_smoketest_network1', 'ip saddr @N_smoketest_network'], + ] + + self.verify_nftables(nftables_search, 'ip vyos_mangle') + + self.cli_delete(['firewall']) + def test_pbr_mark(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) - - self.cli_set(['interfaces', 'ethernet', interface, 'policy', 'route', 'smoketest']) + self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_commit() @@ -63,15 +120,7 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): ['ip daddr 172.16.10.10', 'ip saddr 172.16.20.10', 'meta mark set ' + mark_hex], ] - nftables_output = cmd('sudo nft list table ip mangle') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables_search, 'ip vyos_mangle') def test_pbr_table(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp']) @@ -83,8 +132,8 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'destination', 'port', '8888']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'set', 'table', table_id]) - self.cli_set(['interfaces', 'ethernet', interface, 'policy', 'route', 'smoketest']) - self.cli_set(['interfaces', 'ethernet', interface, 'policy', 'route6', 'smoketest6']) + self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) + self.cli_set(['policy', 'route6', 'smoketest6', 'interface', interface]) self.cli_commit() @@ -94,35 +143,19 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): nftables_search = [ [f'iifname "{interface}"', 'jump VYOS_PBR_smoketest'], - ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'meta mark set ' + mark_hex] + ['tcp flags syn / syn,ack', 'tcp dport 8888', 'meta mark set ' + mark_hex] ] - nftables_output = cmd('sudo nft list table ip mangle') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables_search, 'ip vyos_mangle') # IPv6 nftables6_search = [ [f'iifname "{interface}"', 'jump VYOS_PBR6_smoketest'], - ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'meta mark set ' + mark_hex] + ['meta l4proto { tcp, udp }', 'th dport 8888', 'meta mark set ' + mark_hex] ] - nftables6_output = cmd('sudo nft list table ip6 mangle') - - for search in nftables6_search: - matched = False - for line in nftables6_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables6_search, 'ip6 vyos_mangle') # IP rule fwmark -> table @@ -130,16 +163,84 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): ['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id] ] - ip_rule_output = cmd('ip rule show') + self.verify_rules(ip_rule_search) + + + def test_pbr_matching_criteria(self): + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'udp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'action', 'drop']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'protocol', 'tcp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'tcp', 'flags', 'syn']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'set', 'table', table_id]) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'source', 'address', '198.51.100.0/24']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'protocol', 'tcp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'destination', 'port', '22']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'state', 'new', 'enable']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'ttl', 'gt', '2']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'set', 'table', table_id]) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'protocol', 'icmp']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'icmp', 'type-name', 'echo-request']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'packet-length', '128']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'packet-length', '1024-2048']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'log', 'enable']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'set', 'table', table_id]) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'dscp', '41']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'dscp', '57-59']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'set', 'table', table_id]) + + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'protocol', 'udp']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'action', 'drop']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'protocol', 'tcp']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'tcp', 'flags', 'syn']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'set', 'table', table_id]) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'source', 'address', '2001:db8::0/64']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'protocol', 'tcp']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'destination', 'port', '22']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'state', 'new', 'enable']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'hop-limit', 'gt', '2']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'set', 'table', table_id]) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'protocol', 'icmpv6']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'icmpv6', 'type', 'echo-request']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'packet-length-exclude', '128']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'packet-length-exclude', '1024-2048']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'log', 'enable']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'set', 'table', table_id]) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'dscp-exclude', '61']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'dscp-exclude', '14-19']) + self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'set', 'table', table_id]) + + self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) + self.cli_set(['policy', 'route6', 'smoketest6', 'interface', interface]) - for search in ip_rule_search: - matched = False - for line in ip_rule_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.cli_commit() + + mark_hex = "{0:#010x}".format(table_mark_offset - int(table_id)) + + # IPv4 + nftables_search = [ + [f'iifname "{interface}"', 'jump VYOS_PBR_smoketest'], + ['meta l4proto udp', 'drop'], + ['tcp flags syn / syn,ack', 'meta mark set ' + mark_hex], + ['ct state new', 'tcp dport 22', 'ip saddr 198.51.100.0/24', 'ip ttl > 2', 'meta mark set ' + mark_hex], + ['meta l4proto icmp', 'log prefix "[smoketest-4-A]"', 'icmp type echo-request', 'ip length { 128, 1024-2048 }', 'meta mark set ' + mark_hex], + ['ip dscp { 0x29, 0x39-0x3b }', 'meta mark set ' + mark_hex] + ] + + self.verify_nftables(nftables_search, 'ip vyos_mangle') + + # IPv6 + nftables6_search = [ + [f'iifname "{interface}"', 'jump VYOS_PBR6_smoketest'], + ['meta l4proto udp', 'drop'], + ['tcp flags syn / syn,ack', 'meta mark set ' + mark_hex], + ['ct state new', 'tcp dport 22', 'ip6 saddr 2001:db8::/64', 'ip6 hoplimit > 2', 'meta mark set ' + mark_hex], + ['meta l4proto ipv6-icmp', 'log prefix "[smoketest6-4-A]"', 'icmpv6 type echo-request', 'ip6 length != { 128, 1024-2048 }', 'meta mark set ' + mark_hex], + ['ip6 dscp != { 0x0e-0x13, 0x3d }', 'meta mark set ' + mark_hex] + ] + self.verify_nftables(nftables6_search, 'ip6 vyos_mangle') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 9c0c93779..debc8270c 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -46,7 +46,7 @@ neighbor_config = { 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', - 'local_as' : '300', + 'system_as' : '300', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'no_send_comm_ext' : '', @@ -87,7 +87,7 @@ neighbor_config = { 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', - 'local_as' : '300', + 'system_as' : '300', 'solo' : '', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, @@ -105,7 +105,8 @@ neighbor_config = { 'pfx_list_out' : prefix_list_out6, 'no_send_comm_ext' : '', 'peer_group' : 'foo-bar_baz', - 'graceful_rst_hlp' : '' + 'graceful_rst_hlp' : '', + 'disable_conn_chk' : '', }, } @@ -120,6 +121,7 @@ peer_group_config = { 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', + 'disable_conn_chk' : '', }, 'bar' : { 'remote_as' : '111', @@ -131,7 +133,7 @@ peer_group_config = { 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', - 'local_as' : '300', + 'system_as' : '300', 'pfx_list_in' : prefix_list_in, 'pfx_list_out' : prefix_list_out, 'no_send_comm_ext' : '', @@ -177,7 +179,7 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): cls.cli_delete(cls, ['policy']) def setUp(self): - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) def tearDown(self): self.cli_delete(['vrf']) @@ -251,6 +253,9 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.assertIn(f' neighbor {peer} graceful-restart-disable', frrconfig) if 'graceful_rst_hlp' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-helper', frrconfig) + if 'disable_conn_chk' in peer_config: + self.assertIn(f' neighbor {peer} disable-connected-check', frrconfig) + def test_bgp_01_simple(self): router_id = '127.0.0.1' @@ -266,12 +271,12 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'log-neighbor-changes']) - # Local AS number MUST be defined - as this is set in setUp() we remove + # System AS number MUST be defined - as this is set in setUp() we remove # this once for testing of the proper error - self.cli_delete(base_path + ['local-as']) + self.cli_delete(base_path + ['system-as']) with self.assertRaises(ConfigSessionError): self.cli_commit() - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) # Default local preference (higher = more preferred, default value is 100) self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref]) @@ -282,12 +287,14 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['parameters', 'bestpath', 'as-path', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'bestpath', 'bandwidth', 'default-weight-for-missing']) self.cli_set(base_path + ['parameters', 'bestpath', 'compare-routerid']) + self.cli_set(base_path + ['parameters', 'bestpath', 'peer-type', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'conditional-advertisement', 'timer', cond_adv_timer]) self.cli_set(base_path + ['parameters', 'fast-convergence']) self.cli_set(base_path + ['parameters', 'minimum-holdtime', min_hold_time]) self.cli_set(base_path + ['parameters', 'no-suppress-duplicates']) self.cli_set(base_path + ['parameters', 'reject-as-sets']) + self.cli_set(base_path + ['parameters', 'route-reflector-allow-outbound-policy']) self.cli_set(base_path + ['parameters', 'shutdown']) self.cli_set(base_path + ['parameters', 'suppress-fib-pending']) @@ -313,8 +320,10 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.assertIn(f' bgp bestpath as-path multipath-relax', frrconfig) self.assertIn(f' bgp bestpath bandwidth default-weight-for-missing', frrconfig) self.assertIn(f' bgp bestpath compare-routerid', frrconfig) + self.assertIn(f' bgp bestpath peer-type multipath-relax', frrconfig) self.assertIn(f' bgp minimum-holdtime {min_hold_time}', frrconfig) self.assertIn(f' bgp reject-as-sets', frrconfig) + self.assertIn(f' bgp route-reflector allow-outbound-policy', frrconfig) self.assertIn(f' bgp shutdown', frrconfig) self.assertIn(f' bgp suppress-fib-pending', frrconfig) self.assertNotIn(f'bgp ebgp-requires-policy', frrconfig) @@ -400,6 +409,8 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'restart-helper']) + if 'disable_conn_chk' in peer_config: + self.cli_set(base_path + ['neighbor', peer, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in peer_config: @@ -488,6 +499,8 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'restart-helper']) + if 'disable_conn_chk' in config: + self.cli_set(base_path + ['peer-group', peer_group, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in config: @@ -760,13 +773,13 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): # templates and Jinja2 FRR template. table = '1000' - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) # testing only one AFI is sufficient as it's generic code for vrf in vrfs: vrf_base = ['vrf', 'name', vrf] self.cli_set(vrf_base + ['table', table]) - self.cli_set(vrf_base + ['protocols', 'bgp', 'local-as', ASN]) + self.cli_set(vrf_base + ['protocols', 'bgp', 'system-as', ASN]) self.cli_set(vrf_base + ['protocols', 'bgp', 'parameters', 'router-id', router_id]) self.cli_set(vrf_base + ['protocols', 'bgp', 'route-map', route_map_in]) table = str(int(table) + 1000) @@ -804,7 +817,7 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): confed_id = str(int(ASN) + 1) confed_asns = '10 20 30 40' - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'confederation', 'identifier', confed_id]) for asn in confed_asns.split(): @@ -825,7 +838,7 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): remote_asn = str(int(ASN) + 10) interface = 'eth0' - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) self.cli_set(base_path + ['neighbor', interface, 'address-family', 'ipv6-unicast']) self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as', remote_asn]) @@ -850,7 +863,7 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): rt_export = f'{neighbor}:1002 1.2.3.4:567' rt_import = f'{neighbor}:1003 500:100' - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) # testing only one AFI is sufficient as it's generic code for afi in ['ipv4-unicast', 'ipv6-unicast']: self.cli_set(base_path + ['address-family', afi, 'export', 'vpn']) @@ -889,7 +902,7 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): peer_group = 'bar' interface = 'eth0' - self.cli_set(base_path + ['local-as', ASN]) + self.cli_set(base_path + ['system-as', ASN]) self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) self.cli_set(base_path + ['neighbor', neighbor, 'peer-group', peer_group]) self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', remote_asn]) @@ -921,5 +934,31 @@ class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {remote_asn}', frrconfig) + def test_bgp_15_local_as_ebgp(self): + # https://phabricator.vyos.net/T4560 + # local-as allowed only for ebgp peers + + neighbor = '192.0.2.99' + remote_asn = '500' + local_asn = '400' + + self.cli_set(base_path + ['system-as', ASN]) + self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', ASN]) + self.cli_set(base_path + ['neighbor', neighbor, 'local-as', local_asn]) + + # check validate() - local-as allowed only for ebgp peers + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) + + self.cli_commit() + + frrconfig = self.getFRRconfig(f'router bgp {ASN}') + self.assertIn(f'router bgp {ASN}', frrconfig) + self.assertIn(f' neighbor {neighbor} remote-as {remote_asn}', frrconfig) + self.assertIn(f' neighbor {neighbor} local-as {local_asn}', frrconfig) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py index ee4be0b37..d11d80a1f 100755 --- a/smoketest/scripts/cli/test_protocols_isis.py +++ b/smoketest/scripts/cli/test_protocols_isis.py @@ -262,5 +262,51 @@ class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): self.assertIn(f' isis bfd', tmp) self.assertIn(f' isis bfd profile {bfd_profile}', tmp) + def test_isis_07_segment_routing_configuration(self): + global_block_low = "300" + global_block_high = "399" + local_block_low = "400" + local_block_high = "499" + interface = 'lo' + maximum_stack_size = '5' + prefix_one = '192.168.0.1/32' + prefix_two = '192.168.0.2/32' + prefix_three = '192.168.0.3/32' + prefix_four = '192.168.0.4/32' + prefix_one_value = '1' + prefix_two_value = '2' + prefix_three_value = '60000' + prefix_four_value = '65000' + + self.cli_set(base_path + ['net', net]) + self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) + self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) + self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) + self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) + self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'value', prefix_three_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'explicit-null']) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'value', prefix_four_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'no-php-flag']) + + # Commit all changes + self.cli_commit() + + # Verify all changes + tmp = self.getFRRconfig(f'router isis {domain}', daemon='isisd') + self.assertIn(f' net {net}', tmp) + self.assertIn(f' segment-routing on', tmp) + self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', tmp) + self.assertIn(f' segment-routing node-msd {maximum_stack_size}', tmp) + self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', tmp) + self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', tmp) + self.assertIn(f' segment-routing prefix {prefix_three} absolute {prefix_three_value} explicit-null', tmp) + self.assertIn(f' segment-routing prefix {prefix_four} absolute {prefix_four_value} no-php-flag', tmp) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_nhrp.py b/smoketest/scripts/cli/test_protocols_nhrp.py index 40b19fec7..59252875b 100755 --- a/smoketest/scripts/cli/test_protocols_nhrp.py +++ b/smoketest/scripts/cli/test_protocols_nhrp.py @@ -26,65 +26,79 @@ nhrp_path = ['protocols', 'nhrp'] vpn_path = ['vpn', 'ipsec'] class TestProtocolsNHRP(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestProtocolsNHRP, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, nhrp_path) + cls.cli_delete(cls, tunnel_path) + def tearDown(self): self.cli_delete(nhrp_path) self.cli_delete(tunnel_path) self.cli_commit() def test_config(self): - self.cli_delete(nhrp_path) - self.cli_delete(tunnel_path) + tunnel_if = "tun100" + tunnel_source = "192.0.2.1" + tunnel_encapsulation = "gre" + esp_group = "ESP-HUB" + ike_group = "IKE-HUB" + nhrp_secret = "vyos123" + nhrp_profile = "NHRPVPN" + ipsec_secret = "secret" # Tunnel - self.cli_set(tunnel_path + ["tun100", "address", "172.16.253.134/29"]) - self.cli_set(tunnel_path + ["tun100", "encapsulation", "gre"]) - self.cli_set(tunnel_path + ["tun100", "source-address", "192.0.2.1"]) - self.cli_set(tunnel_path + ["tun100", "multicast", "enable"]) - self.cli_set(tunnel_path + ["tun100", "parameters", "ip", "key", "1"]) + self.cli_set(tunnel_path + [tunnel_if, "address", "172.16.253.134/29"]) + self.cli_set(tunnel_path + [tunnel_if, "encapsulation", tunnel_encapsulation]) + self.cli_set(tunnel_path + [tunnel_if, "source-address", tunnel_source]) + self.cli_set(tunnel_path + [tunnel_if, "multicast", "enable"]) + self.cli_set(tunnel_path + [tunnel_if, "parameters", "ip", "key", "1"]) # NHRP - self.cli_set(nhrp_path + ["tunnel", "tun100", "cisco-authentication", "secret"]) - self.cli_set(nhrp_path + ["tunnel", "tun100", "holding-time", "300"]) - self.cli_set(nhrp_path + ["tunnel", "tun100", "multicast", "dynamic"]) - self.cli_set(nhrp_path + ["tunnel", "tun100", "redirect"]) - self.cli_set(nhrp_path + ["tunnel", "tun100", "shortcut"]) + self.cli_set(nhrp_path + ["tunnel", tunnel_if, "cisco-authentication", nhrp_secret]) + self.cli_set(nhrp_path + ["tunnel", tunnel_if, "holding-time", "300"]) + self.cli_set(nhrp_path + ["tunnel", tunnel_if, "multicast", "dynamic"]) + self.cli_set(nhrp_path + ["tunnel", tunnel_if, "redirect"]) + self.cli_set(nhrp_path + ["tunnel", tunnel_if, "shortcut"]) # IKE/ESP Groups - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "compression", "disable"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "lifetime", "1800"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "mode", "transport"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "pfs", "dh-group2"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "proposal", "1", "encryption", "aes256"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "proposal", "1", "hash", "sha1"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "proposal", "2", "encryption", "3des"]) - self.cli_set(vpn_path + ["esp-group", "ESP-HUB", "proposal", "2", "hash", "md5"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "ikev2-reauth", "no"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "key-exchange", "ikev1"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "lifetime", "3600"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "1", "dh-group", "2"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "1", "encryption", "aes256"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "1", "hash", "sha1"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "2", "dh-group", "2"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "2", "encryption", "aes128"]) - self.cli_set(vpn_path + ["ike-group", "IKE-HUB", "proposal", "2", "hash", "sha1"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "lifetime", "1800"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "mode", "transport"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "pfs", "dh-group2"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "1", "encryption", "aes256"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "1", "hash", "sha1"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "2", "encryption", "3des"]) + self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "2", "hash", "md5"]) + + self.cli_set(vpn_path + ["ike-group", ike_group, "key-exchange", "ikev1"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "lifetime", "3600"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "1", "dh-group", "2"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "1", "encryption", "aes256"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "1", "hash", "sha1"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "2", "dh-group", "2"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "2", "encryption", "aes128"]) + self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "2", "hash", "sha1"]) # Profile - Not doing full DMVPN checks here, just want to verify the profile name in the output self.cli_set(vpn_path + ["interface", "eth0"]) - self.cli_set(vpn_path + ["profile", "NHRPVPN", "authentication", "mode", "pre-shared-secret"]) - self.cli_set(vpn_path + ["profile", "NHRPVPN", "authentication", "pre-shared-secret", "secret"]) - self.cli_set(vpn_path + ["profile", "NHRPVPN", "bind", "tunnel", "tun100"]) - self.cli_set(vpn_path + ["profile", "NHRPVPN", "esp-group", "ESP-HUB"]) - self.cli_set(vpn_path + ["profile", "NHRPVPN", "ike-group", "IKE-HUB"]) + self.cli_set(vpn_path + ["profile", nhrp_profile, "authentication", "mode", "pre-shared-secret"]) + self.cli_set(vpn_path + ["profile", nhrp_profile, "authentication", "pre-shared-secret", ipsec_secret]) + self.cli_set(vpn_path + ["profile", nhrp_profile, "bind", "tunnel", tunnel_if]) + self.cli_set(vpn_path + ["profile", nhrp_profile, "esp-group", esp_group]) + self.cli_set(vpn_path + ["profile", nhrp_profile, "ike-group", ike_group]) self.cli_commit() opennhrp_lines = [ - 'interface tun100 #hub NHRPVPN', - 'cisco-authentication secret', - 'holding-time 300', - 'shortcut', - 'multicast dynamic', - 'redirect' + f'interface {tunnel_if} #hub {nhrp_profile}', + f'cisco-authentication {nhrp_secret}', + f'holding-time 300', + f'shortcut', + f'multicast dynamic', + f'redirect' ] tmp_opennhrp_conf = read_file('/run/opennhrp/opennhrp.conf') @@ -93,13 +107,13 @@ class TestProtocolsNHRP(VyOSUnitTestSHIM.TestCase): self.assertIn(line, tmp_opennhrp_conf) firewall_matches = [ - 'ip protocol gre', - 'ip saddr 192.0.2.1', - 'ip daddr 224.0.0.0/4', - 'comment "VYOS_NHRP_tun100"' + f'ip protocol {tunnel_encapsulation}', + f'ip saddr {tunnel_source}', + f'ip daddr 224.0.0.0/4', + f'comment "VYOS_NHRP_{tunnel_if}"' ] - self.assertTrue(find_nftables_rule('ip filter', 'VYOS_FW_OUTPUT', firewall_matches) is not None) + self.assertTrue(find_nftables_rule('ip vyos_nhrp_filter', 'VYOS_NHRP_OUTPUT', firewall_matches) is not None) self.assertTrue(process_named_running('opennhrp')) if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py index e15ea478b..339713bf6 100755 --- a/smoketest/scripts/cli/test_protocols_ospf.py +++ b/smoketest/scripts/cli/test_protocols_ospf.py @@ -14,8 +14,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import logging -import sys import unittest from base_vyostest_shim import VyOSUnitTestSHIM @@ -23,15 +21,12 @@ from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.util import process_named_running -from vyos.util import cmd PROCESS_NAME = 'ospfd' base_path = ['protocols', 'ospf'] route_map = 'foo-bar-baz10' -log = logging.getLogger('TestProtocolsOSPF') - class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): @@ -75,6 +70,8 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['auto-cost', 'reference-bandwidth', bandwidth]) self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'abr-type', abr_type]) + self.cli_set(base_path + ['parameters', 'opaque-lsa']) + self.cli_set(base_path + ['parameters', 'rfc1583-compatibility']) self.cli_set(base_path + ['log-adjacency-changes', 'detail']) self.cli_set(base_path + ['default-metric', metric]) @@ -84,10 +81,12 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf') self.assertIn(f'router ospf', frrconfig) + self.assertIn(f' compatible rfc1583', frrconfig) self.assertIn(f' auto-cost reference-bandwidth {bandwidth}', frrconfig) self.assertIn(f' ospf router-id {router_id}', frrconfig) self.assertIn(f' ospf abr-type {abr_type}', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults + self.assertIn(f' capability opaque', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) @@ -210,25 +209,14 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map]) self.cli_set(base_path + ['redistribute', protocol, 'metric-type', metric_type]) - # enable FRR debugging to find the root cause of failing testcases - cmd('touch /tmp/vyos.frr.debug') - # commit changes self.cli_commit() - # disable FRR debugging - cmd('rm -f /tmp/vyos.frr.debug') - # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf') - try: - self.assertIn(f'router ospf', frrconfig) - for protocol in redistribute: - self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) - except: - log.debug(frrconfig) - log.debug(cmd('sudo cat /tmp/vyos-configd-script-stdout')) - self.fail('Now we can hopefully see why OSPF fails!') + self.assertIn(f'router ospf', frrconfig) + for protocol in redistribute: + self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospf_08_virtual_link(self): networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] @@ -396,6 +384,41 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): self.assertIn(f' network {network} area {area}', frrconfig) self.assertIn(f' area {area} export-list {acl}', frrconfig) + + def test_ospf_14_segment_routing_configuration(self): + global_block_low = "300" + global_block_high = "399" + local_block_low = "400" + local_block_high = "499" + interface = 'lo' + maximum_stack_size = '5' + prefix_one = '192.168.0.1/32' + prefix_two = '192.168.0.2/32' + prefix_one_value = '1' + prefix_two_value = '2' + + self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) + self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) + self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) + self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) + self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) + self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) + + # Commit all changes + self.cli_commit() + + # Verify all changes + frrconfig = self.getFRRconfig('router ospf') + self.assertIn(f' segment-routing on', frrconfig) + self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', frrconfig) + self.assertIn(f' segment-routing node-msd {maximum_stack_size}', frrconfig) + self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', frrconfig) + self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', frrconfig) + + if __name__ == '__main__': - logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py index 5929f8cba..94e0597ad 100755 --- a/smoketest/scripts/cli/test_service_dns_forwarding.py +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2021 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -26,7 +26,7 @@ from vyos.util import process_named_running CONFIG_FILE = '/run/powerdns/recursor.conf' FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf' HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua' -PROCESS_NAME= 'pdns-r/worker' +PROCESS_NAME= 'pdns_recursor' base_path = ['service', 'dns', 'forwarding'] @@ -39,7 +39,18 @@ def get_config_value(key, file=CONFIG_FILE): return tmp[0] class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestServicePowerDNS, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + # Delete DNS forwarding configuration self.cli_delete(base_path) self.cli_commit() @@ -100,8 +111,9 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('serve-rfc1918') self.assertEqual(tmp, 'yes') - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # verify default port configuration + tmp = get_config_value('local-port') + self.assertEqual(tmp, '53') def test_dnssec(self): # DNSSEC option testing @@ -121,9 +133,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('dnssec') self.assertEqual(tmp, option) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_external_nameserver(self): # Externe Domain Name Servers (DNS) addresses @@ -147,9 +156,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('export-etc-hosts') self.assertEqual(tmp, 'yes') - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_domain_forwarding(self): for network in allow_from: self.cli_set(base_path + ['allow-from', network]) @@ -186,9 +192,6 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): if domain == domains[1]: self.assertIn(f'addNTA("{domain}", "static")', hosts_conf) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - def test_no_rfc1918_forwarding(self): for network in allow_from: self.cli_set(base_path + ['allow-from', network]) @@ -204,9 +207,42 @@ class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('serve-rfc1918') self.assertEqual(tmp, 'no') - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + def test_dns64(self): + dns_prefix = '64:ff9b::/96' -if __name__ == '__main__': - unittest.main(verbosity=2) + for network in allow_from: + self.cli_set(base_path + ['allow-from', network]) + for address in listen_adress: + self.cli_set(base_path + ['listen-address', address]) + + # Check dns64-prefix - must be prefix /96 + self.cli_set(base_path + ['dns64-prefix', '2001:db8:aabb::/64']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(base_path + ['dns64-prefix', dns_prefix]) + + # commit changes + self.cli_commit() + + # verify dns64-prefix configuration + tmp = get_config_value('dns64-prefix') + self.assertEqual(tmp, dns_prefix) + def test_listening_port(self): + # We can listen on a different port compared to '53' but only one at a time + for port in ['1053', '5353']: + self.cli_set(base_path + ['port', port]) + for network in allow_from: + self.cli_set(base_path + ['allow-from', network]) + for address in listen_adress: + self.cli_set(base_path + ['listen-address', address]) + + # commit changes + self.cli_commit() + + # verify local-port configuration + tmp = get_config_value('local-port') + self.assertEqual(tmp, port) + +if __name__ == '__main__': + unittest.main(verbosity=2, failfast=True) diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py index 71fb3e177..0f4b1393c 100755 --- a/smoketest/scripts/cli/test_service_https.py +++ b/smoketest/scripts/cli/test_service_https.py @@ -138,5 +138,106 @@ class TestHTTPSService(VyOSUnitTestSHIM.TestCase): # Must get HTTP code 401 on missing key (Unauthorized) self.assertEqual(r.status_code, 401) + # GraphQL auth test: a missing key will return status code 400, as + # 'key' is a non-nullable field in the schema; an incorrect key is + # caught by the resolver, and returns success 'False', so one must + # check the return value. + + self.cli_set(base_path + ['api', 'graphql']) + self.cli_commit() + + graphql_url = f'https://{address}/graphql' + + query_valid_key = f""" + {{ + SystemStatus (data: {{key: "{key}"}}) {{ + success + errors + data {{ + result + }} + }} + }} + """ + + r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_valid_key}) + success = r.json()['data']['SystemStatus']['success'] + self.assertTrue(success) + + query_invalid_key = """ + { + SystemStatus (data: {key: "invalid"}) { + success + errors + data { + result + } + } + } + """ + + r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_invalid_key}) + success = r.json()['data']['SystemStatus']['success'] + self.assertFalse(success) + + query_no_key = """ + { + SystemStatus (data: {}) { + success + errors + data { + result + } + } + } + """ + + r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_no_key}) + self.assertEqual(r.status_code, 400) + + # GraphQL token authentication test: request token; pass in header + # of query. + + self.cli_set(base_path + ['api', 'graphql', 'authentication', 'type', 'token']) + self.cli_commit() + + mutation = """ + mutation { + AuthToken (data: {username: "vyos", password: "vyos"}) { + success + errors + data { + result + } + } + } + """ + r = request('POST', graphql_url, verify=False, headers=headers, json={'query': mutation}) + + token = r.json()['data']['AuthToken']['data']['result']['token'] + + headers = {'Authorization': f'Bearer {token}'} + + query = """ + { + ShowVersion (data: {}) { + success + errors + op_mode_error { + name + message + vyos_code + } + data { + result + } + } + } + """ + + r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query}) + success = r.json()['data']['ShowVersion']['success'] + self.assertTrue(success) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ids.py b/smoketest/scripts/cli/test_service_ids.py index 18f1b8ec5..dcf2bcefe 100755 --- a/smoketest/scripts/cli/test_service_ids.py +++ b/smoketest/scripts/cli/test_service_ids.py @@ -24,7 +24,9 @@ from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'fastnetmon' -FASTNETMON_CONF = '/etc/fastnetmon.conf' +FASTNETMON_CONF = '/run/fastnetmon/fastnetmon.conf' +NETWORKS_CONF = '/run/fastnetmon/networks_list' +EXCLUDED_NETWORKS_CONF = '/run/fastnetmon/excluded_networks_list' base_path = ['service', 'ids', 'ddos-protection'] class TestServiceIDS(VyOSUnitTestSHIM.TestCase): @@ -48,7 +50,8 @@ class TestServiceIDS(VyOSUnitTestSHIM.TestCase): self.assertFalse(process_named_running(PROCESS_NAME)) def test_fastnetmon(self): - networks = ['10.0.0.0/24', '10.5.5.0/24'] + networks = ['10.0.0.0/24', '10.5.5.0/24', '2001:db8:10::/64', '2001:db8:20::/64'] + excluded_networks = ['10.0.0.1/32', '2001:db8:10::1/128'] interfaces = ['eth0', 'eth1'] fps = '3500' mbps = '300' @@ -61,6 +64,12 @@ class TestServiceIDS(VyOSUnitTestSHIM.TestCase): for tmp in networks: self.cli_set(base_path + ['network', tmp]) + # optional excluded-network! + with self.assertRaises(ConfigSessionError): + self.cli_commit() + for tmp in excluded_networks: + self.cli_set(base_path + ['excluded-network', tmp]) + # Required interface(s)! with self.assertRaises(ConfigSessionError): self.cli_commit() @@ -68,9 +77,9 @@ class TestServiceIDS(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['listen-interface', tmp]) self.cli_set(base_path + ['direction', 'in']) - self.cli_set(base_path + ['threshold', 'fps', fps]) - self.cli_set(base_path + ['threshold', 'pps', pps]) - self.cli_set(base_path + ['threshold', 'mbps', mbps]) + self.cli_set(base_path + ['threshold', 'general', 'fps', fps]) + self.cli_set(base_path + ['threshold', 'general', 'pps', pps]) + self.cli_set(base_path + ['threshold', 'general', 'mbps', mbps]) # commit changes self.cli_commit() @@ -86,9 +95,22 @@ class TestServiceIDS(VyOSUnitTestSHIM.TestCase): self.assertIn(f'threshold_mbps = {mbps}', config) self.assertIn(f'ban_for_pps = on', config) self.assertIn(f'threshold_pps = {pps}', config) + # default + self.assertIn(f'enable_ban = on', config) + self.assertIn(f'enable_ban_ipv6 = on', config) + self.assertIn(f'ban_time = 1900', config) tmp = ','.join(interfaces) self.assertIn(f'interfaces = {tmp}', config) + + network_config = read_file(NETWORKS_CONF) + for tmp in networks: + self.assertIn(f'{tmp}', network_config) + + excluded_network_config = read_file(EXCLUDED_NETWORKS_CONF) + for tmp in excluded_networks: + self.assertIn(f'{tmp}', excluded_network_config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py new file mode 100755 index 000000000..bdab35834 --- /dev/null +++ b/smoketest/scripts/cli/test_service_ipoe-server.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import unittest + +from base_accel_ppp_test import BasicAccelPPPTest +from vyos.configsession import ConfigSessionError +from vyos.util import cmd + +from configparser import ConfigParser + +ac_name = 'ACN' +interface = 'eth0' + +class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): + @classmethod + def setUpClass(cls): + cls._base_path = ['service', 'ipoe-server'] + cls._config_file = '/run/accel-pppd/ipoe.conf' + cls._chap_secrets = '/run/accel-pppd/ipoe.chap-secrets' + + # call base-classes classmethod + super(TestServiceIPoEServer, cls).setUpClass() + + def verify(self, conf): + super().verify(conf) + + # Validate configuration values + accel_modules = list(conf['modules'].keys()) + self.assertIn('log_syslog', accel_modules) + self.assertIn('ipoe', accel_modules) + self.assertIn('shaper', accel_modules) + self.assertIn('ipv6pool', accel_modules) + self.assertIn('ipv6_nd', accel_modules) + self.assertIn('ipv6_dhcp', accel_modules) + self.assertIn('ippool', accel_modules) + + def basic_config(self): + self.set(['interface', interface, 'client-subnet', '192.168.0.0/24']) + + def test_accel_local_authentication(self): + mac_address = '08:00:27:2f:d8:06' + self.set(['authentication', 'interface', interface, 'mac', mac_address]) + self.set(['authentication', 'mode', 'local']) + + # No IPoE interface configured + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # Test configuration of local authentication for PPPoE server + self.basic_config() + + # commit changes + self.cli_commit() + + # Validate configuration values + conf = ConfigParser(allow_no_value=True, delimiters='=') + conf.read(self._config_file) + + # check proper path to chap-secrets file + self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets) + + accel_modules = list(conf['modules'].keys()) + self.assertIn('chap-secrets', accel_modules) + + # basic verification + self.verify(conf) + + # check local users + tmp = cmd(f'sudo cat {self._chap_secrets}') + regex = f'{interface}\s+\*\s+{mac_address}\s+\*' + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + +if __name__ == '__main__': + unittest.main(verbosity=2) + diff --git a/smoketest/scripts/cli/test_service_monitoring_telegraf.py b/smoketest/scripts/cli/test_service_monitoring_telegraf.py index 09937513e..ed486c3b9 100755 --- a/smoketest/scripts/cli/test_service_monitoring_telegraf.py +++ b/smoketest/scripts/cli/test_service_monitoring_telegraf.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -24,7 +24,7 @@ from vyos.util import process_named_running from vyos.util import read_file PROCESS_NAME = 'telegraf' -TELEGRAF_CONF = '/run/telegraf/vyos-telegraf.conf' +TELEGRAF_CONF = '/run/telegraf/telegraf.conf' base_path = ['service', 'monitoring', 'telegraf'] org = 'log@in.local' token = 'GuRJc12tIzfjnYdKRAIYbxdWd2aTpOT9PVYNddzDnFV4HkAcD7u7-kndTFXjGuXzJN6TTxmrvPODB4mnFcseDV==' @@ -35,21 +35,24 @@ inputs = ['cpu', 'disk', 'mem', 'net', 'system', 'kernel', 'interrupts', 'syslog class TestMonitoringTelegraf(VyOSUnitTestSHIM.TestCase): def tearDown(self): + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(base_path) self.cli_commit() + # Check for not longer running process + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_01_basic_config(self): - self.cli_set(base_path + ['authentication', 'organization', org]) - self.cli_set(base_path + ['authentication', 'token', token]) - self.cli_set(base_path + ['port', port]) - self.cli_set(base_path + ['url', url]) + self.cli_set(base_path + ['influxdb', 'authentication', 'organization', org]) + self.cli_set(base_path + ['influxdb', 'authentication', 'token', token]) + self.cli_set(base_path + ['influxdb', 'port', port]) + self.cli_set(base_path + ['influxdb', 'url', url]) # commit changes self.cli_commit() - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) - config = read_file(TELEGRAF_CONF) # Check telegraf config @@ -57,6 +60,7 @@ class TestMonitoringTelegraf(VyOSUnitTestSHIM.TestCase): self.assertIn(f' token = "$INFLUX_TOKEN"', config) self.assertIn(f'urls = ["{url}:{port}"]', config) self.assertIn(f'bucket = "{bucket}"', config) + self.assertIn(f'[[inputs.exec]]', config) for input in inputs: self.assertIn(input, config) diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index 51cc098ef..7546c2e3d 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 020 VyOS maintainers and contributors +# Copyright (C) 2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -14,27 +14,27 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import unittest from base_accel_ppp_test import BasicAccelPPPTest from configparser import ConfigParser -from vyos.configsession import ConfigSessionError -from vyos.util import process_named_running +from vyos.util import read_file +from vyos.template import range_to_regex local_if = ['interfaces', 'dummy', 'dum667'] ac_name = 'ACN' interface = 'eth0' class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): - def setUp(self): - self._base_path = ['service', 'pppoe-server'] - self._process_name = 'accel-pppd' - self._config_file = '/run/accel-pppd/pppoe.conf' - self._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets' + @classmethod + def setUpClass(cls): + cls._base_path = ['service', 'pppoe-server'] + cls._config_file = '/run/accel-pppd/pppoe.conf' + cls._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets' - super().setUp() + # call base-classes classmethod + super(TestServicePPPoEServer, cls).setUpClass() def tearDown(self): self.cli_delete(local_if) @@ -120,8 +120,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): # check interface-cache self.assertEqual(conf['ppp']['unit-cache'], interface_cache) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) def test_pppoe_server_authentication_protocols(self): # Test configuration of local authentication for PPPoE server @@ -139,8 +137,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['modules']['auth_mschap_v2'], None) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) def test_pppoe_server_client_ip_pool(self): # Test configuration of IPv6 client pools @@ -168,9 +164,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['ip-pool'][start_stop], None) self.assertEqual(conf['ip-pool']['gw-ip-address'], self._gateway) - # Check for running process - self.assertTrue(process_named_running(self._process_name)) - def test_pppoe_server_client_ipv6_pool(self): # Test configuration of IPv6 client pools @@ -211,9 +204,6 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['ipv6-pool'][client_prefix], None) self.assertEqual(conf['ipv6-pool']['delegate'], f'{delegate_prefix},{delegate_mask}') - # Check for running process - self.assertTrue(process_named_running(self._process_name)) - def test_accel_radius_authentication(self): radius_called_sid = 'ifname:mac' @@ -234,5 +224,27 @@ class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): self.assertEqual(conf['radius']['acct-interim-jitter'], radius_acct_interim_jitter) + def test_pppoe_server_vlan(self): + + vlans = ['100', '200', '300-310'] + + # Test configuration of local authentication for PPPoE server + self.basic_config() + + for vlan in vlans: + self.set(['interface', interface, 'vlan', vlan]) + + # commit changes + self.cli_commit() + + # Validate configuration values + config = read_file(self._config_file) + for vlan in vlans: + tmp = range_to_regex(vlan) + self.assertIn(f'interface=re:^{interface}\.{tmp}$', config) + + tmp = ','.join(vlans) + self.assertIn(f'vlan-mon={interface},{tmp}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py index 4875fb5d1..873be7df0 100755 --- a/smoketest/scripts/cli/test_service_router-advert.py +++ b/smoketest/scripts/cli/test_service_router-advert.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2020 VyOS maintainers and contributors +# Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -17,16 +17,19 @@ import re import unittest +from vyos.configsession import ConfigSessionError from base_vyostest_shim import VyOSUnitTestSHIM from vyos.util import read_file from vyos.util import process_named_running +PROCESS_NAME = 'radvd' RADVD_CONF = '/run/radvd/radvd.conf' interface = 'eth1' base_path = ['service', 'router-advert', 'interface', interface] address_base = ['interfaces', 'ethernet', interface, 'address'] +prefix = '::/64' def get_config_value(key): tmp = read_file(RADVD_CONF) @@ -34,18 +37,36 @@ def get_config_value(key): return tmp[0].split()[0].replace(';','') class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): - def setUp(self): - self.cli_set(address_base + ['2001:db8::1/64']) + + @classmethod + def setUpClass(cls): + super(TestServiceRADVD, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, ['service', 'router-advert']) + + cls.cli_set(cls, address_base + ['2001:db8::1/64']) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, address_base) + super(TestServiceRADVD, cls).tearDownClass() def tearDown(self): - self.cli_delete(address_base) + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(base_path) self.cli_commit() + # Check for no longer running process + self.assertFalse(process_named_running(PROCESS_NAME)) + def test_common(self): - self.cli_set(base_path + ['prefix', '::/64', 'no-on-link-flag']) - self.cli_set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) - self.cli_set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) + self.cli_set(base_path + ['prefix', prefix, 'no-on-link-flag']) + self.cli_set(base_path + ['prefix', prefix, 'no-autonomous-flag']) + self.cli_set(base_path + ['prefix', prefix, 'valid-lifetime', 'infinity']) self.cli_set(base_path + ['other-config-flag']) # commit changes @@ -56,7 +77,7 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): self.assertEqual(tmp, interface) tmp = get_config_value('prefix') - self.assertEqual(tmp, '::/64') + self.assertEqual(tmp, prefix) tmp = get_config_value('AdvOtherConfigFlag') self.assertEqual(tmp, 'on') @@ -87,14 +108,19 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): tmp = get_config_value('AdvOnLink') self.assertEqual(tmp, 'off') - # Check for running process - self.assertTrue(process_named_running('radvd')) + tmp = get_config_value('DeprecatePrefix') + self.assertEqual(tmp, 'off') + + tmp = get_config_value('DecrementLifetimes') + self.assertEqual(tmp, 'off') + def test_dns(self): nameserver = ['2001:db8::1', '2001:db8::2'] dnssl = ['vyos.net', 'vyos.io'] + ns_lifetime = '599' - self.cli_set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) + self.cli_set(base_path + ['prefix', prefix, 'valid-lifetime', 'infinity']) self.cli_set(base_path + ['other-config-flag']) for ns in nameserver: @@ -102,6 +128,14 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): for sl in dnssl: self.cli_set(base_path + ['dnssl', sl]) + self.cli_set(base_path + ['name-server-lifetime', ns_lifetime]) + # The value, if not 0, must be at least interval max (defaults to 600). + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + ns_lifetime = '600' + self.cli_set(base_path + ['name-server-lifetime', ns_lifetime]) + # commit changes self.cli_commit() @@ -110,8 +144,28 @@ class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): tmp = 'RDNSS ' + ' '.join(nameserver) + ' {' self.assertIn(tmp, config) + tmp = f'AdvRDNSSLifetime {ns_lifetime};' + self.assertIn(tmp, config) + tmp = 'DNSSL ' + ' '.join(dnssl) + ' {' self.assertIn(tmp, config) + + def test_deprecate_prefix(self): + self.cli_set(base_path + ['prefix', prefix, 'valid-lifetime', 'infinity']) + self.cli_set(base_path + ['prefix', prefix, 'deprecate-prefix']) + self.cli_set(base_path + ['prefix', prefix, 'decrement-lifetime']) + + # commit changes + self.cli_commit() + + config = read_file(RADVD_CONF) + + tmp = get_config_value('DeprecatePrefix') + self.assertEqual(tmp, 'on') + + tmp = get_config_value('DecrementLifetimes') + self.assertEqual(tmp, 'on') + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 0b029dd00..8de98f34f 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -262,5 +262,42 @@ class TestServiceSSH(VyOSUnitTestSHIM.TestCase): self.assertFalse(process_named_running(SSHGUARD_PROCESS)) + + # Network Device Collaborative Protection Profile + def test_ssh_ndcpp(self): + ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr'] + host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519'] + kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521'] + macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512'] + rekey_time = '60' + rekey_data = '1024' + + for cipher in ciphers: + self.cli_set(base_path + ['ciphers', cipher]) + for host_key in host_key_algs: + self.cli_set(base_path + ['hostkey-algorithm', host_key]) + for kex in kexes: + self.cli_set(base_path + ['key-exchange', kex]) + for mac in macs: + self.cli_set(base_path + ['mac', mac]) + # Optional rekey parameters + self.cli_set(base_path + ['rekey', 'data', rekey_data]) + self.cli_set(base_path + ['rekey', 'time', rekey_time]) + + # commit changes + self.cli_commit() + + ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr', + 'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519', + 'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512', + 'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521', + 'RekeyLimit 1024M 60M' + ] + tmp_sshd_conf = read_file(SSHD_CONF) + + for line in ssh_lines: + self.assertIn(line, tmp_sshd_conf) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_webproxy.py b/smoketest/scripts/cli/test_service_webproxy.py index 772d6ab16..fb9b46a06 100755 --- a/smoketest/scripts/cli/test_service_webproxy.py +++ b/smoketest/scripts/cli/test_service_webproxy.py @@ -87,6 +87,8 @@ class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase): max_obj_size = '8192' block_mine = ['application/pdf', 'application/x-sh'] body_max_size = '4096' + safe_port = '88' + ssl_safe_port = '8443' self.cli_set(base_path + ['listen-address', listen_ip]) self.cli_set(base_path + ['append-domain', domain]) @@ -104,6 +106,9 @@ class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['reply-body-max-size', body_max_size]) + self.cli_set(base_path + ['safe-ports', safe_port]) + self.cli_set(base_path + ['ssl-safe-ports', ssl_safe_port]) + # commit changes self.cli_commit() @@ -122,6 +127,9 @@ class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase): self.assertIn(f'reply_body_max_size {body_max_size} KB', config) + self.assertIn(f'acl Safe_ports port {safe_port}', config) + self.assertIn(f'acl SSL_ports port {ssl_safe_port}', config) + # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index 95c2a6c55..fd16146b1 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -59,7 +59,7 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): }, 'net.netfilter.nf_conntrack_tcp_max_retrans' :{ 'cli' : ['tcp', 'max-retrans'], - 'test_value' : '1024', + 'test_value' : '128', 'default_value' : '3', }, 'net.netfilter.nf_conntrack_icmp_timeout' :{ diff --git a/smoketest/scripts/cli/test_system_flow-accounting.py b/smoketest/scripts/cli/test_system_flow-accounting.py index a6eef3fb6..df60b9613 100755 --- a/smoketest/scripts/cli/test_system_flow-accounting.py +++ b/smoketest/scripts/cli/test_system_flow-accounting.py @@ -144,14 +144,15 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): self.assertNotIn(f'plugins: memory', uacctd) for server, server_config in sflow_server.items(): + plugin_name = server.replace('.', '-') if 'port' in server_config: - self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}', uacctd) + self.assertIn(f'sfprobe_receiver[sf_{plugin_name}]: {server}', uacctd) else: - self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}:6343', uacctd) + self.assertIn(f'sfprobe_receiver[sf_{plugin_name}]: {server}:6343', uacctd) - self.assertIn(f'sfprobe_agentip[sf_{server}]: {agent_address}', uacctd) - self.assertIn(f'sampling_rate[sf_{server}]: {sampling_rate}', uacctd) - self.assertIn(f'sfprobe_source_ip[sf_{server}]: {source_address}', uacctd) + self.assertIn(f'sfprobe_agentip[sf_{plugin_name}]: {agent_address}', uacctd) + self.assertIn(f'sampling_rate[sf_{plugin_name}]: {sampling_rate}', uacctd) + self.assertIn(f'sfprobe_source_ip[sf_{plugin_name}]: {source_address}', uacctd) self.cli_delete(['interfaces', 'dummy', dummy_if]) @@ -194,8 +195,7 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): for server, server_config in sflow_server.items(): tmp_srv = server - if is_ipv6(tmp_srv): - tmp_srv = tmp_srv.replace(':', '.') + tmp_srv = tmp_srv.replace(':', '-') if 'port' in server_config: self.assertIn(f'sfprobe_receiver[sf_{tmp_srv}]: {bracketize_ipv6(server)}', uacctd) @@ -265,16 +265,16 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): tmp = [] for server, server_config in netflow_server.items(): tmp_srv = server - if is_ipv6(tmp_srv): - tmp_srv = tmp_srv.replace(':', '.') + tmp_srv = tmp_srv.replace('.', '-') + tmp_srv = tmp_srv.replace(':', '-') tmp.append(f'nfprobe[nf_{tmp_srv}]') tmp.append('memory') self.assertIn('plugins: ' + ','.join(tmp), uacctd) for server, server_config in netflow_server.items(): tmp_srv = server - if is_ipv6(tmp_srv): - tmp_srv = tmp_srv.replace(':', '.') + tmp_srv = tmp_srv.replace('.', '-') + tmp_srv = tmp_srv.replace(':', '-') self.assertIn(f'nfprobe_engine[nf_{tmp_srv}]: {engine_id}', uacctd) self.assertIn(f'nfprobe_maxflows[nf_{tmp_srv}]: {max_flows}', uacctd) diff --git a/smoketest/scripts/cli/test_system_ip.py b/smoketest/scripts/cli/test_system_ip.py index 83df9d99e..f71ef5b3f 100755 --- a/smoketest/scripts/cli/test_system_ip.py +++ b/smoketest/scripts/cli/test_system_ip.py @@ -28,7 +28,7 @@ class TestSystemIP(VyOSUnitTestSHIM.TestCase): def test_system_ip_forwarding(self): # Test if IPv4 forwarding can be disabled globally, default is '1' - # which means forwearding enabled + # which means forwarding enabled all_forwarding = '/proc/sys/net/ipv4/conf/all/forwarding' self.assertEqual(read_file(all_forwarding), '1') @@ -37,6 +37,17 @@ class TestSystemIP(VyOSUnitTestSHIM.TestCase): self.assertEqual(read_file(all_forwarding), '0') + def test_system_ip_directed_broadcast_forwarding(self): + # Test if IPv4 directed broadcast forwarding can be disabled globally, + # default is '1' which means forwarding enabled + bc_forwarding = '/proc/sys/net/ipv4/conf/all/bc_forwarding' + self.assertEqual(read_file(bc_forwarding), '1') + + self.cli_set(base_path + ['disable-directed-broadcast']) + self.cli_commit() + + self.assertEqual(read_file(bc_forwarding), '0') + def test_system_ip_multipath(self): # Test IPv4 multipathing options, options default to off -> '0' use_neigh = '/proc/sys/net/ipv4/fib_multipath_use_neigh' diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 1131b6f93..6006fe0f6 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -46,6 +46,14 @@ TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk= """ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestSystemLogin, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration which will break this test + cls.cli_delete(cls, base_path + ['radius']) + def tearDown(self): # Delete individual users from configuration for user in users: @@ -97,6 +105,22 @@ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): # b'Linux LR1.wue3 5.10.61-amd64-vyos #1 SMP Fri Aug 27 08:55:46 UTC 2021 x86_64 GNU/Linux\n' self.assertTrue(len(stdout) > 40) + def test_system_login_otp(self): + otp_user = 'otp-test_user' + otp_password = 'SuperTestPassword' + otp_key = '76A3ZS6HFHBTOK2H4NDHTIVFPQ' + + self.cli_set(base_path + ['user', otp_user, 'authentication', 'plaintext-password', otp_password]) + self.cli_set(base_path + ['user', otp_user, 'authentication', 'otp', 'key', otp_key]) + + self.cli_commit() + + # Check if OTP key was written properly + tmp = cmd(f'sudo head -1 /home/{otp_user}/.google_authenticator') + self.assertIn(otp_key, tmp) + + self.cli_delete(base_path + ['user', otp_user]) + def test_system_user_ssh_key(self): ssh_user = 'ssh-test_user' public_keys = 'vyos_test@domain-foo.com' diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index e2821687c..a0806acf0 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -108,5 +108,22 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase): for listen in listen_address: self.assertIn(f'interface listen {listen}', config) + def test_03_ntp_interface(self): + interfaces = ['eth0', 'eth1'] + for interface in interfaces: + self.cli_set(base_path + ['interface', interface]) + + servers = ['time1.vyos.net', 'time2.vyos.net'] + for server in servers: + self.cli_set(base_path + ['server', server]) + + self.cli_commit() + + # Check generated client address configuration + config = read_file(NTP_CONF) + self.assertIn('interface ignore wildcard', config) + for interface in interfaces: + self.assertIn(f'interface listen {interface}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py index 8a6514d57..bd242104f 100755 --- a/smoketest/scripts/cli/test_vpn_ipsec.py +++ b/smoketest/scripts/cli/test_vpn_ipsec.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -33,6 +33,7 @@ dhcp_waiting_file = '/tmp/ipsec_dhcp_waiting' swanctl_file = '/etc/swanctl/swanctl.conf' peer_ip = '203.0.113.45' +connection_name = 'main-branch' interface = 'eth1' vif = '100' esp_group = 'MyESPGroup' @@ -150,7 +151,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(ethernet_path + [interface, 'vif', vif, 'address', 'dhcp']) # Use VLAN to avoid getting IP from qemu dhcp server # Site to site - peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] + peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['authentication', 'pre-shared-secret', secret]) self.cli_set(peer_base_path + ['ike-group', ike_group]) @@ -173,7 +174,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): priority = '20' life_bytes = '100000' life_packets = '2000000' - peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] + peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(base_path + ['esp-group', esp_group, 'life-bytes', life_bytes]) self.cli_set(base_path + ['esp-group', esp_group, 'life-packets', life_packets]) @@ -183,6 +184,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) + self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'protocol', 'tcp']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.11.0/24']) @@ -211,11 +213,11 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', f'mode = tunnel', - f'peer_{peer_ip.replace(".","-")}_tunnel_1', + f'{connection_name}-tunnel-1', f'local_ts = 172.16.10.0/24[tcp/443],172.16.11.0/24[tcp/443]', f'remote_ts = 172.17.10.0/24[tcp/443],172.17.11.0/24[tcp/443]', f'mode = tunnel', - f'peer_{peer_ip.replace(".","-")}_tunnel_2', + f'{connection_name}-tunnel-2', f'local_ts = 10.1.0.0/16', f'remote_ts = 10.2.0.0/16', f'priority = {priority}', @@ -226,7 +228,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): swanctl_secrets_lines = [ f'id-local = {local_address} # dhcp:no', - f'id-remote = {peer_ip}', + f'id-remote_{peer_ip.replace(".","-")} = {peer_ip}', f'secret = "{secret}"' ] for line in swanctl_secrets_lines: @@ -236,18 +238,24 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): def test_03_site_to_site_vti(self): local_address = '192.0.2.10' vti = 'vti10' + # IKE + self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) + self.cli_set(base_path + ['ike-group', ike_group, 'disable-mobike']) + # ESP + self.cli_set(base_path + ['esp-group', esp_group, 'compression']) # VTI interface self.cli_set(vti_path + [vti, 'address', '10.1.1.1/24']) - self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) # Site to site - peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] + peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['authentication', 'pre-shared-secret', secret]) self.cli_set(peer_base_path + ['connection-type', 'none']) + self.cli_set(peer_base_path + ['force-udp-encapsulation']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) + self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.11.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.10.0/24']) @@ -269,10 +277,12 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'proposals = aes128-sha1-modp1024', f'esp_proposals = aes128-sha1-modp1024', f'local_addrs = {local_address} # dhcp:no', + f'mobike = no', f'remote_addrs = {peer_ip}', f'mode = tunnel', f'local_ts = 172.16.10.0/24,172.16.11.0/24', f'remote_ts = 172.17.10.0/24,172.17.11.0/24', + f'ipcomp = yes', f'start_action = none', f'if_id_in = {if_id}', # will be 11 for vti10 - shifted by one f'if_id_out = {if_id}', @@ -283,7 +293,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): swanctl_secrets_lines = [ f'id-local = {local_address} # dhcp:no', - f'id-remote = {peer_ip}', + f'id-remote_{peer_ip.replace(".","-")} = {peer_ip}', f'secret = "{secret}"' ] for line in swanctl_secrets_lines: @@ -311,7 +321,6 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'shortcut']) # IKE/ESP Groups - self.cli_set(base_path + ['esp-group', esp_group, 'compression', 'disable']) self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', esp_lifetime]) self.cli_set(base_path + ['esp-group', esp_group, 'mode', 'transport']) self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'dh-group2']) @@ -320,7 +329,6 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', '3des']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'md5']) - self.cli_set(base_path + ['ike-group', ike_group, 'ikev2-reauth', 'no']) self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev1']) self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', ike_lifetime]) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '2']) @@ -366,10 +374,11 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(vti_path + [vti, 'address', '192.168.0.1/31']) peer_ip = '172.18.254.202' + connection_name = 'office' local_address = '172.18.254.201' - peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] + peer_base_path = base_path + ['site-to-site', 'peer', connection_name] - self.cli_set(peer_base_path + ['authentication', 'id', peer_name]) + self.cli_set(peer_base_path + ['authentication', 'local-id', peer_name]) self.cli_set(peer_base_path + ['authentication', 'mode', 'x509']) self.cli_set(peer_base_path + ['authentication', 'remote-id', 'peer2']) self.cli_set(peer_base_path + ['authentication', 'x509', 'ca-certificate', ca_name]) @@ -378,6 +387,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['ikev2-reauth', 'inherit']) self.cli_set(peer_base_path + ['local-address', local_address]) + self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['vti', 'bind', vti]) self.cli_set(peer_base_path + ['vti', 'esp-group', esp_group]) @@ -391,7 +401,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): # to also support a vti0 interface if_id = str(int(if_id) +1) swanctl_lines = [ - f'peer_{tmp}', + f'{connection_name}', f'version = 0', # key-exchange not set - defaulting to 0 for ikev1 and ikev2 f'send_cert = always', f'mobike = yes', @@ -416,7 +426,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ - f'peer_{tmp}', + f'{connection_name}', f'file = {peer_name}.pem', ] for line in swanctl_secrets_lines: @@ -430,7 +440,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): local_address = '192.0.2.5' local_id = 'vyos-r1' remote_id = 'vyos-r2' - peer_base_path = base_path + ['site-to-site', 'peer', peer_ip] + peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(tunnel_path + ['tun1', 'encapsulation', 'gre']) self.cli_set(tunnel_path + ['tun1', 'source-address', local_address]) @@ -438,10 +448,9 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['options', 'flexvpn']) self.cli_set(base_path + ['options', 'interface', 'tun1']) - self.cli_set(base_path + ['ike-group', ike_group, 'ikev2-reauth', 'no']) self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) - self.cli_set(peer_base_path + ['authentication', 'id', local_id]) + self.cli_set(peer_base_path + ['authentication', 'local-id', local_id]) self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['authentication', 'pre-shared-secret', secret]) self.cli_set(peer_base_path + ['authentication', 'remote-id', remote_id]) @@ -449,6 +458,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) + self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'protocol', 'gre']) self.cli_set(peer_base_path + ['virtual-address', '203.0.113.55']) @@ -464,7 +474,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): f'life_time = 3600s', # default value f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', - f'peer_{peer_ip.replace(".","-")}_tunnel_1', + f'{connection_name}-tunnel-1', f'mode = tunnel', ] @@ -473,7 +483,7 @@ class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): swanctl_secrets_lines = [ f'id-local = {local_address} # dhcp:no', - f'id-remote = {peer_ip}', + f'id-remote_{peer_ip.replace(".","-")} = {peer_ip}', f'id-localid = {local_id}', f'id-remoteid = {remote_id}', f'secret = "{secret}"', diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py index bda279342..8572d6d66 100755 --- a/smoketest/scripts/cli/test_vpn_openconnect.py +++ b/smoketest/scripts/cli/test_vpn_openconnect.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,6 +19,7 @@ import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.util import process_named_running +from vyos.util import read_file OCSERV_CONF = '/run/ocserv/ocserv.conf' base_path = ['vpn', 'openconnect'] @@ -46,36 +47,84 @@ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww """ -class TestVpnOpenconnect(VyOSUnitTestSHIM.TestCase): +PROCESS_NAME = 'ocserv-main' +config_file = '/run/ocserv/ocserv.conf' +auth_file = '/run/ocserv/ocpasswd' +otp_file = '/run/ocserv/users.oath' + +class TestVPNOpenConnect(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestVPNOpenConnect, cls).setUpClass() + + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + + cls.cli_set(cls, pki_path + ['ca', 'openconnect', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'openconnect', 'private', 'key', key_data.replace('\n','')]) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, pki_path) + super(TestVPNOpenConnect, cls).tearDownClass() + def tearDown(self): - # Delete vpn openconnect configuration - self.cli_delete(pki_path) + self.assertTrue(process_named_running(PROCESS_NAME)) + self.cli_delete(base_path) self.cli_commit() - def test_vpn(self): + self.assertFalse(process_named_running(PROCESS_NAME)) + + def test_ocserv(self): user = 'vyos_user' password = 'vyos_pass' otp = '37500000026900000000200000000000' - - self.cli_delete(pki_path) - self.cli_delete(base_path) - - self.cli_set(pki_path + ['ca', 'openconnect', 'certificate', cert_data.replace('\n','')]) - self.cli_set(pki_path + ['certificate', 'openconnect', 'certificate', cert_data.replace('\n','')]) - self.cli_set(pki_path + ['certificate', 'openconnect', 'private', 'key', key_data.replace('\n','')]) + v4_subnet = '192.0.2.0/24' + v6_prefix = '2001:db8:1000::/64' + v6_len = '126' + name_server = ['1.2.3.4', '1.2.3.5', '2001:db8::1'] + split_dns = ['vyos.net', 'vyos.io'] self.cli_set(base_path + ['authentication', 'local-users', 'username', user, 'password', password]) self.cli_set(base_path + ['authentication', 'local-users', 'username', user, 'otp', 'key', otp]) self.cli_set(base_path + ['authentication', 'mode', 'local', 'password-otp']) - self.cli_set(base_path + ['network-settings', 'client-ip-settings', 'subnet', '192.0.2.0/24']) + + self.cli_set(base_path + ['network-settings', 'client-ip-settings', 'subnet', v4_subnet]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'prefix', v6_prefix]) + self.cli_set(base_path + ['network-settings', 'client-ipv6-pool', 'mask', v6_len]) + + for ns in name_server: + self.cli_set(base_path + ['network-settings', 'name-server', ns]) + for domain in split_dns: + self.cli_set(base_path + ['network-settings', 'split-dns', domain]) + self.cli_set(base_path + ['ssl', 'ca-certificate', 'openconnect']) self.cli_set(base_path + ['ssl', 'certificate', 'openconnect']) self.cli_commit() - # Check for running process - self.assertTrue(process_named_running('ocserv-main')) + # Verify configuration + daemon_config = read_file(config_file) + + # authentication mode local password-otp + self.assertIn(f'auth = "plain[passwd=/run/ocserv/ocpasswd,otp=/run/ocserv/users.oath]"', daemon_config) + self.assertIn(f'ipv4-network = {v4_subnet}', daemon_config) + self.assertIn(f'ipv6-network = {v6_prefix}', daemon_config) + self.assertIn(f'ipv6-subnet-prefix = {v6_len}', daemon_config) + + for ns in name_server: + self.assertIn(f'dns = {ns}', daemon_config) + for domain in split_dns: + self.assertIn(f'split-dns = {domain}', daemon_config) + + auth_config = read_file(auth_file) + self.assertIn(f'{user}:*:$', auth_config) + + otp_config = read_file(otp_file) + self.assertIn(f'HOTP/T30/6 {user} - {otp}', otp_config) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py index 24673278b..434e3aa05 100755 --- a/smoketest/scripts/cli/test_vpn_sstp.py +++ b/smoketest/scripts/cli/test_vpn_sstp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -17,29 +17,51 @@ import unittest from base_accel_ppp_test import BasicAccelPPPTest -from vyos.util import cmd +from vyos.util import read_file pki_path = ['pki'] -cert_data = 'MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIwWTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIxMDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu+JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3LftzngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93+dm/LDnp7C0=' -key_data = 'MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww' + +cert_data = """ +MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIw +WTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv +bWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIx +MDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNV +BAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlP +UzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE +01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3 +QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8E +BAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu ++JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3Lftz +ngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93 ++dm/LDnp7C0=""" + +key_data = """ +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx +2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7 +u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww +""" class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): - def setUp(self): - self._base_path = ['vpn', 'sstp'] - self._process_name = 'accel-pppd' - self._config_file = '/run/accel-pppd/sstp.conf' - self._chap_secrets = '/run/accel-pppd/sstp.chap-secrets' - super().setUp() + @classmethod + def setUpClass(cls): + cls._base_path = ['vpn', 'sstp'] + cls._config_file = '/run/accel-pppd/sstp.conf' + cls._chap_secrets = '/run/accel-pppd/sstp.chap-secrets' + + # call base-classes classmethod + super(TestVPNSSTPServer, cls).setUpClass() + + cls.cli_set(cls, pki_path + ['ca', 'sstp', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'sstp', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, pki_path + ['certificate', 'sstp', 'private', 'key', key_data.replace('\n','')]) - def tearDown(self): - self.cli_delete(pki_path) - super().tearDown() + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, pki_path) + + super(TestVPNSSTPServer, cls).tearDownClass() def basic_config(self): - self.cli_delete(pki_path) - self.cli_set(pki_path + ['ca', 'sstp', 'certificate', cert_data]) - self.cli_set(pki_path + ['certificate', 'sstp', 'certificate', cert_data]) - self.cli_set(pki_path + ['certificate', 'sstp', 'private', 'key', key_data]) # SSL is mandatory self.set(['ssl', 'ca-certificate', 'sstp']) self.set(['ssl', 'certificate', 'sstp']) @@ -47,5 +69,15 @@ class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): super().basic_config() + def test_accel_local_authentication(self): + # Change default port + port = '8443' + self.set(['port', port]) + + super().test_accel_local_authentication() + + config = read_file(self._config_file) + self.assertIn(f'port={port}', config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_zone_policy.py b/smoketest/scripts/cli/test_zone_policy.py deleted file mode 100755 index 2c580e2f1..000000000 --- a/smoketest/scripts/cli/test_zone_policy.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2021-2022 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import unittest - -from base_vyostest_shim import VyOSUnitTestSHIM - -from vyos.util import cmd - -class TestZonePolicy(VyOSUnitTestSHIM.TestCase): - @classmethod - def setUpClass(cls): - super(TestZonePolicy, cls).setUpClass() - cls.cli_set(cls, ['firewall', 'name', 'smoketest', 'default-action', 'drop']) - - @classmethod - def tearDownClass(cls): - cls.cli_delete(cls, ['firewall']) - super(TestZonePolicy, cls).tearDownClass() - - def tearDown(self): - self.cli_delete(['zone-policy']) - self.cli_commit() - - def test_basic_zone(self): - self.cli_set(['zone-policy', 'zone', 'smoketest-eth0', 'interface', 'eth0']) - self.cli_set(['zone-policy', 'zone', 'smoketest-eth0', 'from', 'smoketest-local', 'firewall', 'name', 'smoketest']) - self.cli_set(['zone-policy', 'zone', 'smoketest-local', 'local-zone']) - self.cli_set(['zone-policy', 'zone', 'smoketest-local', 'from', 'smoketest-eth0', 'firewall', 'name', 'smoketest']) - - self.cli_commit() - - nftables_search = [ - ['chain VZONE_smoketest-eth0'], - ['chain VZONE_smoketest-local_IN'], - ['chain VZONE_smoketest-local_OUT'], - ['oifname { "eth0" }', 'jump VZONE_smoketest-eth0'], - ['jump VZONE_smoketest-local_IN'], - ['jump VZONE_smoketest-local_OUT'], - ['iifname { "eth0" }', 'jump NAME_smoketest'], - ['oifname { "eth0" }', 'jump NAME_smoketest'] - ] - - nftables_output = cmd('sudo nft list table ip filter') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) - - -if __name__ == '__main__': - unittest.main(verbosity=2) |