diff options
-rw-r--r-- | debian/vyos-1x.postinst | 2 | ||||
-rw-r--r-- | python/vyos/template.py | 11 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_container.py | 112 | ||||
-rwxr-xr-x | src/conf_mode/nat.py | 23 | ||||
-rwxr-xr-x | src/conf_mode/nat66.py | 8 | ||||
-rw-r--r-- | src/tests/test_jinja_filters.py | 69 | ||||
-rw-r--r-- | src/tests/test_template.py | 44 |
7 files changed, 140 insertions, 129 deletions
diff --git a/debian/vyos-1x.postinst b/debian/vyos-1x.postinst index cd88cf60c..f7ebec8bc 100644 --- a/debian/vyos-1x.postinst +++ b/debian/vyos-1x.postinst @@ -73,7 +73,7 @@ if ! grep -q '^tacacs' /etc/passwd; then adduser --quiet tacacs${level} frr fi level=$(( level+1 )) - done 2>&1 | grep -v 'User tacacs${level} already exists' + done 2>&1 | grep -v "User tacacs${level} already exists" fi # Add RADIUS operator user for RADIUS authenticated users to map to diff --git a/python/vyos/template.py b/python/vyos/template.py index 77b6a5ab0..29ea0889b 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -316,20 +316,15 @@ def is_ipv6(text): except: return False @register_filter('first_host_address') -def first_host_address(text): +def first_host_address(prefix): """ Return first usable (host) IP address from given prefix. Example: - 10.0.0.0/24 -> 10.0.0.1 - 2001:db8::/64 -> 2001:db8:: """ from ipaddress import ip_interface - from ipaddress import IPv4Network - from ipaddress import IPv6Network - - addr = ip_interface(text) - if addr.version == 4: - return str(addr.ip +1) - return str(addr.ip) + tmp = ip_interface(prefix).network + return str(tmp.network_address +1) @register_filter('last_host_address') def last_host_address(text): diff --git a/smoketest/scripts/cli/test_container.py b/smoketest/scripts/cli/test_container.py index b43c05fae..cdf46a6e1 100755 --- a/smoketest/scripts/cli/test_container.py +++ b/smoketest/scripts/cli/test_container.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,6 +19,7 @@ import glob import json from base_vyostest_shim import VyOSUnitTestSHIM +from ipaddress import ip_interface from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd @@ -27,8 +28,6 @@ from vyos.utils.file import read_file base_path = ['container'] cont_image = 'busybox:stable' # busybox is included in vyos-build -prefix = '192.168.205.0/24' -net_name = 'NET01' PROCESS_NAME = 'conmon' PROCESS_PIDFILE = '/run/vyos-container-{0}.service.pid' @@ -37,10 +36,8 @@ busybox_image_path = '/usr/share/vyos/busybox-stable.tar' def cmd_to_json(command): c = cmd(command + ' --format=json') data = json.loads(c)[0] - return data - class TestContainer(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): @@ -52,6 +49,10 @@ class TestContainer(VyOSUnitTestSHIM.TestCase): except: cls.skipTest(cls, reason='busybox image not available') + # ensure we can also run this test on a live system - so lets clean + # out the current configuration :) + cls.cli_delete(cls, base_path) + @classmethod def tearDownClass(cls): super(TestContainer, cls).tearDownClass() @@ -70,7 +71,7 @@ class TestContainer(VyOSUnitTestSHIM.TestCase): units = glob.glob('/run/systemd/system/vyos-container-*') self.assertEqual(units, []) - def test_01_basic_container(self): + def test_basic(self): cont_name = 'c1' self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '10.0.2.15/24']) @@ -91,24 +92,101 @@ class TestContainer(VyOSUnitTestSHIM.TestCase): # Check for running process self.assertEqual(process_named_running(PROCESS_NAME), pid) - def test_02_container_network(self): - cont_name = 'c2' - cont_ip = '192.168.205.25' + def test_ipv4_network(self): + prefix = '192.0.2.0/24' + base_name = 'ipv4' + net_name = 'NET01' + self.cli_set(base_path + ['network', net_name, 'prefix', prefix]) - self.cli_set(base_path + ['name', cont_name, 'image', cont_image]) - self.cli_set(base_path + ['name', cont_name, 'network', net_name, 'address', cont_ip]) - # commit changes + for ii in range(1, 6): + name = f'{base_name}-{ii}' + self.cli_set(base_path + ['name', name, 'image', cont_image]) + self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)]) + + # verify() - first IP address of a prefix can not be used by a container + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + tmp = f'{base_name}-1' + self.cli_delete(base_path + ['name', tmp]) self.cli_commit() n = cmd_to_json(f'sudo podman network inspect {net_name}') - json_subnet = n['subnets'][0]['subnet'] + self.assertEqual(n['subnets'][0]['subnet'], prefix) - c = cmd_to_json(f'sudo podman container inspect {cont_name}') - json_ip = c['NetworkSettings']['Networks'][net_name]['IPAddress'] + # skipt first container, it was never created + for ii in range(2, 6): + name = f'{base_name}-{ii}' + c = cmd_to_json(f'sudo podman container inspect {name}') + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix).ip + 1)) + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'], str(ip_interface(prefix).ip + ii)) - self.assertEqual(json_subnet, prefix) - self.assertEqual(json_ip, cont_ip) + def test_ipv6_network(self): + prefix = '2001:db8::/64' + base_name = 'ipv6' + net_name = 'NET02' + + self.cli_set(base_path + ['network', net_name, 'prefix', prefix]) + + for ii in range(1, 6): + name = f'{base_name}-{ii}' + self.cli_set(base_path + ['name', name, 'image', cont_image]) + self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)]) + + # verify() - first IP address of a prefix can not be used by a container + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + tmp = f'{base_name}-1' + self.cli_delete(base_path + ['name', tmp]) + self.cli_commit() + + n = cmd_to_json(f'sudo podman network inspect {net_name}') + self.assertEqual(n['subnets'][0]['subnet'], prefix) + + # skipt first container, it was never created + for ii in range(2, 6): + name = f'{base_name}-{ii}' + c = cmd_to_json(f'sudo podman container inspect {name}') + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix).ip + 1)) + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix).ip + ii)) + + def test_dual_stack_network(self): + prefix4 = '192.0.2.0/24' + prefix6 = '2001:db8::/64' + base_name = 'dual-stack' + net_name = 'net-4-6' + + self.cli_set(base_path + ['network', net_name, 'prefix', prefix4]) + self.cli_set(base_path + ['network', net_name, 'prefix', prefix6]) + + for ii in range(1, 6): + name = f'{base_name}-{ii}' + self.cli_set(base_path + ['name', name, 'image', cont_image]) + self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix4).ip + ii)]) + self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix6).ip + ii)]) + + # verify() - first IP address of a prefix can not be used by a container + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + tmp = f'{base_name}-1' + self.cli_delete(base_path + ['name', tmp]) + self.cli_commit() + + n = cmd_to_json(f'sudo podman network inspect {net_name}') + self.assertEqual(n['subnets'][0]['subnet'], prefix4) + self.assertEqual(n['subnets'][1]['subnet'], prefix6) + + # skipt first container, it was never created + for ii in range(2, 6): + name = f'{base_name}-{ii}' + c = cmd_to_json(f'sudo podman container inspect {name}') + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix6).ip + 1)) + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix6).ip + ii)) + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix4).ip + 1)) + self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'] , str(ip_interface(prefix4).ip + ii)) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index 44b13d413..20570da62 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -80,15 +80,13 @@ def verify_rule(config, err_msg, groups_dict): dict_search('source.port', config)): if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: - raise ConfigError(f'{err_msg}\n' \ - 'ports can only be specified when protocol is '\ - 'either tcp, udp or tcp_udp!') + raise ConfigError(f'{err_msg} ports can only be specified when '\ + 'protocol is either tcp, udp or tcp_udp!') if is_ip_network(dict_search('translation.address', config)): - raise ConfigError(f'{err_msg}\n' \ - 'Cannot use ports with an IPv4 network as translation address as it\n' \ - 'statically maps a whole network of addresses onto another\n' \ - 'network of addresses') + raise ConfigError(f'{err_msg} cannot use ports with an IPv4 network as '\ + 'translation address as it statically maps a whole network '\ + 'of addresses onto another network of addresses!') for side in ['destination', 'source']: if side in config: @@ -152,10 +150,10 @@ def verify(nat): if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: - raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for nat source rule "{rule}"') + raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: if config['outbound_interface']['name'] not in 'any' and config['outbound_interface']['name'] not in interfaces(): - Warning(f'{err_msg} - interface "{config["outbound_interface"]["name"]}" does not exist on this system') + Warning(f'NAT interface "{config["outbound_interface"]["name"]}" for source NAT rule "{rule}" does not exist!') if not dict_search('translation.address', config) and not dict_search('translation.port', config): if 'exclude' not in config and 'backend' not in config['load_balance']: @@ -176,10 +174,10 @@ def verify(nat): if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: - raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for destination nat rule "{rule}"') + raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: if config['inbound_interface']['name'] not in 'any' and config['inbound_interface']['name'] not in interfaces(): - Warning(f'{err_msg} - interface "{config["inbound_interface"]["name"]}" does not exist on this system') + Warning(f'NAT interface "{config["inbound_interface"]["name"]}" for destination NAT rule "{rule}" does not exist!') if not dict_search('translation.address', config) and not dict_search('translation.port', config) and 'redirect' not in config['translation']: if 'exclude' not in config and 'backend' not in config['load_balance']: @@ -193,8 +191,7 @@ def verify(nat): err_msg = f'Static NAT configuration error in rule {rule}:' if 'inbound_interface' not in config: - raise ConfigError(f'{err_msg}\n' \ - 'inbound-interface not specified') + raise ConfigError(f'{err_msg} inbound-interface not specified') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py index dee1551fe..4c1ead258 100755 --- a/src/conf_mode/nat66.py +++ b/src/conf_mode/nat66.py @@ -64,10 +64,10 @@ def verify(nat): if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: - raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for nat source rule "{rule}"') + raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: if config['outbound_interface']['name'] not in 'any' and config['outbound_interface']['name'] not in interfaces(): - Warning(f'{err_msg} - interface "{config["outbound_interface"]["name"]}" does not exist on this system') + Warning(f'NAT66 interface "{config["outbound_interface"]["name"]}" for source NAT66 rule "{rule}" does not exist!') addr = dict_search('translation.address', config) if addr != None: @@ -88,10 +88,10 @@ def verify(nat): if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: - raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for destination nat rule "{rule}"') + raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: if config['inbound_interface']['name'] not in 'any' and config['inbound_interface']['name'] not in interfaces(): - Warning(f'{err_msg} - interface "{config["inbound_interface"]["name"]}" does not exist on this system') + Warning(f'NAT66 interface "{config["inbound_interface"]["name"]}" for destination NAT66 rule "{rule}" does not exist!') return None diff --git a/src/tests/test_jinja_filters.py b/src/tests/test_jinja_filters.py deleted file mode 100644 index 8a7241fe3..000000000 --- a/src/tests/test_jinja_filters.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from unittest import TestCase - -from ipaddress import ip_network -from vyos.template import address_from_cidr -from vyos.template import netmask_from_cidr -from vyos.template import is_ipv4 -from vyos.template import is_ipv6 -from vyos.template import first_host_address -from vyos.template import last_host_address -from vyos.template import inc_ip - -class TestTeamplteHelpers(TestCase): - def setUp(self): - pass - - def test_helpers_from_cidr(self): - network_v4 = '192.0.2.0/26' - self.assertEqual(address_from_cidr(network_v4), str(ip_network(network_v4).network_address)) - self.assertEqual(netmask_from_cidr(network_v4), str(ip_network(network_v4).netmask)) - - def test_helpers_ipv4(self): - self.assertTrue(is_ipv4('192.0.2.1')) - self.assertTrue(is_ipv4('192.0.2.0/24')) - self.assertTrue(is_ipv4('192.0.2.1/32')) - self.assertTrue(is_ipv4('10.255.1.2')) - self.assertTrue(is_ipv4('10.255.1.0/24')) - self.assertTrue(is_ipv4('10.255.1.2/32')) - self.assertFalse(is_ipv4('2001:db8::')) - self.assertFalse(is_ipv4('2001:db8::1')) - self.assertFalse(is_ipv4('2001:db8::/64')) - - def test_helpers_ipv6(self): - self.assertFalse(is_ipv6('192.0.2.1')) - self.assertFalse(is_ipv6('192.0.2.0/24')) - self.assertFalse(is_ipv6('192.0.2.1/32')) - self.assertFalse(is_ipv6('10.255.1.2')) - self.assertFalse(is_ipv6('10.255.1.0/24')) - self.assertFalse(is_ipv6('10.255.1.2/32')) - self.assertTrue(is_ipv6('2001:db8::')) - self.assertTrue(is_ipv6('2001:db8::1')) - self.assertTrue(is_ipv6('2001:db8::1/64')) - self.assertTrue(is_ipv6('2001:db8::/32')) - self.assertTrue(is_ipv6('2001:db8::/64')) - - def test_helpers_first_host_address(self): - self.assertEqual(first_host_address('10.0.0.0/24'), '10.0.0.1') - self.assertEqual(first_host_address('10.0.0.128/25'), '10.0.0.129') - self.assertEqual(first_host_address('10.0.0.200/29'), '10.0.0.201') - - self.assertEqual(first_host_address('2001:db8::/64'), '2001:db8::') - self.assertEqual(first_host_address('2001:db8::/112'), '2001:db8::') - self.assertEqual(first_host_address('2001:db8::10/112'), '2001:db8::10') - self.assertEqual(first_host_address('2001:db8::100/112'), '2001:db8::100') diff --git a/src/tests/test_template.py b/src/tests/test_template.py index 2d065f545..aba97015e 100644 --- a/src/tests/test_template.py +++ b/src/tests/test_template.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -17,6 +17,7 @@ import os import vyos.template +from ipaddress import ip_network from unittest import TestCase class TestVyOSTemplate(TestCase): @@ -67,6 +68,9 @@ class TestVyOSTemplate(TestCase): # ValueError: 2001:db8::1/48 has host bits set self.assertEqual(vyos.template.address_from_cidr('2001:db8::1/48'), '2001:db8::1') + network_v4 = '192.0.2.0/26' + self.assertEqual(vyos.template.address_from_cidr(network_v4), str(ip_network(network_v4).network_address)) + def test_netmask_from_cidr(self): self.assertEqual(vyos.template.netmask_from_cidr('192.0.2.0/24'), '255.255.255.0') self.assertEqual(vyos.template.netmask_from_cidr('192.0.2.128/25'), '255.255.255.128') @@ -80,28 +84,35 @@ class TestVyOSTemplate(TestCase): # ValueError: 2001:db8:1:/64 has host bits set self.assertEqual(vyos.template.netmask_from_cidr('2001:db8:1:/64'), 'ffff:ffff:ffff:ffff::') + network_v4 = '192.0.2.0/26' + self.assertEqual(vyos.template.netmask_from_cidr(network_v4), str(ip_network(network_v4).netmask)) + def test_first_host_address(self): - self.assertEqual(vyos.template.first_host_address('10.0.0.0/24'), '10.0.0.1') - self.assertEqual(vyos.template.first_host_address('10.0.0.128/25'), '10.0.0.129') - self.assertEqual(vyos.template.first_host_address('2001:db8::/64'), '2001:db8::') + self.assertEqual(vyos.template.first_host_address('10.0.0.0/24'), '10.0.0.1') + self.assertEqual(vyos.template.first_host_address('10.0.0.10/24'), '10.0.0.1') + self.assertEqual(vyos.template.first_host_address('10.0.0.255/24'), '10.0.0.1') + self.assertEqual(vyos.template.first_host_address('10.0.0.128/25'), '10.0.0.129') + self.assertEqual(vyos.template.first_host_address('2001:db8::/64'), '2001:db8::1') + self.assertEqual(vyos.template.first_host_address('2001:db8::1000/64'), '2001:db8::1') + self.assertEqual(vyos.template.first_host_address('2001:db8::ffff:ffff:ffff:ffff/64'), '2001:db8::1') def test_last_host_address(self): - self.assertEqual(vyos.template.last_host_address('10.0.0.0/24'), '10.0.0.254') - self.assertEqual(vyos.template.last_host_address('10.0.0.128/25'), '10.0.0.254') - self.assertEqual(vyos.template.last_host_address('2001:db8::/64'), '2001:db8::ffff:ffff:ffff:ffff') + self.assertEqual(vyos.template.last_host_address('10.0.0.0/24'), '10.0.0.254') + self.assertEqual(vyos.template.last_host_address('10.0.0.128/25'), '10.0.0.254') + self.assertEqual(vyos.template.last_host_address('2001:db8::/64'), '2001:db8::ffff:ffff:ffff:ffff') def test_increment_ip(self): - self.assertEqual(vyos.template.inc_ip('10.0.0.0/24', '2'), '10.0.0.2') - self.assertEqual(vyos.template.inc_ip('10.0.0.0', '2'), '10.0.0.2') - self.assertEqual(vyos.template.inc_ip('10.0.0.0', '10'), '10.0.0.10') - self.assertEqual(vyos.template.inc_ip('2001:db8::/64', '2'), '2001:db8::2') - self.assertEqual(vyos.template.inc_ip('2001:db8::', '10'), '2001:db8::a') + self.assertEqual(vyos.template.inc_ip('10.0.0.0/24', '2'), '10.0.0.2') + self.assertEqual(vyos.template.inc_ip('10.0.0.0', '2'), '10.0.0.2') + self.assertEqual(vyos.template.inc_ip('10.0.0.0', '10'), '10.0.0.10') + self.assertEqual(vyos.template.inc_ip('2001:db8::/64', '2'), '2001:db8::2') + self.assertEqual(vyos.template.inc_ip('2001:db8::', '10'), '2001:db8::a') def test_decrement_ip(self): - self.assertEqual(vyos.template.dec_ip('10.0.0.100/24', '1'), '10.0.0.99') - self.assertEqual(vyos.template.dec_ip('10.0.0.90', '10'), '10.0.0.80') - self.assertEqual(vyos.template.dec_ip('2001:db8::b/64', '10'), '2001:db8::1') - self.assertEqual(vyos.template.dec_ip('2001:db8::f', '5'), '2001:db8::a') + self.assertEqual(vyos.template.dec_ip('10.0.0.100/24', '1'), '10.0.0.99') + self.assertEqual(vyos.template.dec_ip('10.0.0.90', '10'), '10.0.0.80') + self.assertEqual(vyos.template.dec_ip('2001:db8::b/64', '10'), '2001:db8::1') + self.assertEqual(vyos.template.dec_ip('2001:db8::f', '5'), '2001:db8::a') def test_is_network(self): self.assertFalse(vyos.template.is_ip_network('192.0.2.0')) @@ -181,4 +192,3 @@ class TestVyOSTemplate(TestCase): for group_name, group_config in data['ike_group'].items(): ciphers = vyos.template.get_esp_ike_cipher(group_config) self.assertIn(IKEv2_DEFAULT, ','.join(ciphers)) - |