summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--debian/vyos-1x.postinst2
-rw-r--r--python/vyos/template.py11
-rwxr-xr-xsmoketest/scripts/cli/test_container.py112
-rwxr-xr-xsrc/conf_mode/nat.py23
-rwxr-xr-xsrc/conf_mode/nat66.py8
-rw-r--r--src/tests/test_jinja_filters.py69
-rw-r--r--src/tests/test_template.py44
7 files changed, 140 insertions, 129 deletions
diff --git a/debian/vyos-1x.postinst b/debian/vyos-1x.postinst
index cd88cf60c..f7ebec8bc 100644
--- a/debian/vyos-1x.postinst
+++ b/debian/vyos-1x.postinst
@@ -73,7 +73,7 @@ if ! grep -q '^tacacs' /etc/passwd; then
adduser --quiet tacacs${level} frr
fi
level=$(( level+1 ))
- done 2>&1 | grep -v 'User tacacs${level} already exists'
+ done 2>&1 | grep -v "User tacacs${level} already exists"
fi
# Add RADIUS operator user for RADIUS authenticated users to map to
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 77b6a5ab0..29ea0889b 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -316,20 +316,15 @@ def is_ipv6(text):
except: return False
@register_filter('first_host_address')
-def first_host_address(text):
+def first_host_address(prefix):
""" Return first usable (host) IP address from given prefix.
Example:
- 10.0.0.0/24 -> 10.0.0.1
- 2001:db8::/64 -> 2001:db8::
"""
from ipaddress import ip_interface
- from ipaddress import IPv4Network
- from ipaddress import IPv6Network
-
- addr = ip_interface(text)
- if addr.version == 4:
- return str(addr.ip +1)
- return str(addr.ip)
+ tmp = ip_interface(prefix).network
+ return str(tmp.network_address +1)
@register_filter('last_host_address')
def last_host_address(text):
diff --git a/smoketest/scripts/cli/test_container.py b/smoketest/scripts/cli/test_container.py
index b43c05fae..cdf46a6e1 100755
--- a/smoketest/scripts/cli/test_container.py
+++ b/smoketest/scripts/cli/test_container.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -19,6 +19,7 @@ import glob
import json
from base_vyostest_shim import VyOSUnitTestSHIM
+from ipaddress import ip_interface
from vyos.configsession import ConfigSessionError
from vyos.utils.process import cmd
@@ -27,8 +28,6 @@ from vyos.utils.file import read_file
base_path = ['container']
cont_image = 'busybox:stable' # busybox is included in vyos-build
-prefix = '192.168.205.0/24'
-net_name = 'NET01'
PROCESS_NAME = 'conmon'
PROCESS_PIDFILE = '/run/vyos-container-{0}.service.pid'
@@ -37,10 +36,8 @@ busybox_image_path = '/usr/share/vyos/busybox-stable.tar'
def cmd_to_json(command):
c = cmd(command + ' --format=json')
data = json.loads(c)[0]
-
return data
-
class TestContainer(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -52,6 +49,10 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
except:
cls.skipTest(cls, reason='busybox image not available')
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
@classmethod
def tearDownClass(cls):
super(TestContainer, cls).tearDownClass()
@@ -70,7 +71,7 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
units = glob.glob('/run/systemd/system/vyos-container-*')
self.assertEqual(units, [])
- def test_01_basic_container(self):
+ def test_basic(self):
cont_name = 'c1'
self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '10.0.2.15/24'])
@@ -91,24 +92,101 @@ class TestContainer(VyOSUnitTestSHIM.TestCase):
# Check for running process
self.assertEqual(process_named_running(PROCESS_NAME), pid)
- def test_02_container_network(self):
- cont_name = 'c2'
- cont_ip = '192.168.205.25'
+ def test_ipv4_network(self):
+ prefix = '192.0.2.0/24'
+ base_name = 'ipv4'
+ net_name = 'NET01'
+
self.cli_set(base_path + ['network', net_name, 'prefix', prefix])
- self.cli_set(base_path + ['name', cont_name, 'image', cont_image])
- self.cli_set(base_path + ['name', cont_name, 'network', net_name, 'address', cont_ip])
- # commit changes
+ for ii in range(1, 6):
+ name = f'{base_name}-{ii}'
+ self.cli_set(base_path + ['name', name, 'image', cont_image])
+ self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)])
+
+ # verify() - first IP address of a prefix can not be used by a container
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ tmp = f'{base_name}-1'
+ self.cli_delete(base_path + ['name', tmp])
self.cli_commit()
n = cmd_to_json(f'sudo podman network inspect {net_name}')
- json_subnet = n['subnets'][0]['subnet']
+ self.assertEqual(n['subnets'][0]['subnet'], prefix)
- c = cmd_to_json(f'sudo podman container inspect {cont_name}')
- json_ip = c['NetworkSettings']['Networks'][net_name]['IPAddress']
+ # skipt first container, it was never created
+ for ii in range(2, 6):
+ name = f'{base_name}-{ii}'
+ c = cmd_to_json(f'sudo podman container inspect {name}')
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix).ip + 1))
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'], str(ip_interface(prefix).ip + ii))
- self.assertEqual(json_subnet, prefix)
- self.assertEqual(json_ip, cont_ip)
+ def test_ipv6_network(self):
+ prefix = '2001:db8::/64'
+ base_name = 'ipv6'
+ net_name = 'NET02'
+
+ self.cli_set(base_path + ['network', net_name, 'prefix', prefix])
+
+ for ii in range(1, 6):
+ name = f'{base_name}-{ii}'
+ self.cli_set(base_path + ['name', name, 'image', cont_image])
+ self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix).ip + ii)])
+
+ # verify() - first IP address of a prefix can not be used by a container
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ tmp = f'{base_name}-1'
+ self.cli_delete(base_path + ['name', tmp])
+ self.cli_commit()
+
+ n = cmd_to_json(f'sudo podman network inspect {net_name}')
+ self.assertEqual(n['subnets'][0]['subnet'], prefix)
+
+ # skipt first container, it was never created
+ for ii in range(2, 6):
+ name = f'{base_name}-{ii}'
+ c = cmd_to_json(f'sudo podman container inspect {name}')
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix).ip + 1))
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix).ip + ii))
+
+ def test_dual_stack_network(self):
+ prefix4 = '192.0.2.0/24'
+ prefix6 = '2001:db8::/64'
+ base_name = 'dual-stack'
+ net_name = 'net-4-6'
+
+ self.cli_set(base_path + ['network', net_name, 'prefix', prefix4])
+ self.cli_set(base_path + ['network', net_name, 'prefix', prefix6])
+
+ for ii in range(1, 6):
+ name = f'{base_name}-{ii}'
+ self.cli_set(base_path + ['name', name, 'image', cont_image])
+ self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix4).ip + ii)])
+ self.cli_set(base_path + ['name', name, 'network', net_name, 'address', str(ip_interface(prefix6).ip + ii)])
+
+ # verify() - first IP address of a prefix can not be used by a container
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ tmp = f'{base_name}-1'
+ self.cli_delete(base_path + ['name', tmp])
+ self.cli_commit()
+
+ n = cmd_to_json(f'sudo podman network inspect {net_name}')
+ self.assertEqual(n['subnets'][0]['subnet'], prefix4)
+ self.assertEqual(n['subnets'][1]['subnet'], prefix6)
+
+ # skipt first container, it was never created
+ for ii in range(2, 6):
+ name = f'{base_name}-{ii}'
+ c = cmd_to_json(f'sudo podman container inspect {name}')
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPv6Gateway'] , str(ip_interface(prefix6).ip + 1))
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['GlobalIPv6Address'], str(ip_interface(prefix6).ip + ii))
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['Gateway'] , str(ip_interface(prefix4).ip + 1))
+ self.assertEqual(c['NetworkSettings']['Networks'][net_name]['IPAddress'] , str(ip_interface(prefix4).ip + ii))
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py
index 44b13d413..20570da62 100755
--- a/src/conf_mode/nat.py
+++ b/src/conf_mode/nat.py
@@ -80,15 +80,13 @@ def verify_rule(config, err_msg, groups_dict):
dict_search('source.port', config)):
if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']:
- raise ConfigError(f'{err_msg}\n' \
- 'ports can only be specified when protocol is '\
- 'either tcp, udp or tcp_udp!')
+ raise ConfigError(f'{err_msg} ports can only be specified when '\
+ 'protocol is either tcp, udp or tcp_udp!')
if is_ip_network(dict_search('translation.address', config)):
- raise ConfigError(f'{err_msg}\n' \
- 'Cannot use ports with an IPv4 network as translation address as it\n' \
- 'statically maps a whole network of addresses onto another\n' \
- 'network of addresses')
+ raise ConfigError(f'{err_msg} cannot use ports with an IPv4 network as '\
+ 'translation address as it statically maps a whole network '\
+ 'of addresses onto another network of addresses!')
for side in ['destination', 'source']:
if side in config:
@@ -152,10 +150,10 @@ def verify(nat):
if 'outbound_interface' in config:
if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']:
- raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for nat source rule "{rule}"')
+ raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"')
elif 'name' in config['outbound_interface']:
if config['outbound_interface']['name'] not in 'any' and config['outbound_interface']['name'] not in interfaces():
- Warning(f'{err_msg} - interface "{config["outbound_interface"]["name"]}" does not exist on this system')
+ Warning(f'NAT interface "{config["outbound_interface"]["name"]}" for source NAT rule "{rule}" does not exist!')
if not dict_search('translation.address', config) and not dict_search('translation.port', config):
if 'exclude' not in config and 'backend' not in config['load_balance']:
@@ -176,10 +174,10 @@ def verify(nat):
if 'inbound_interface' in config:
if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']:
- raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for destination nat rule "{rule}"')
+ raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"')
elif 'name' in config['inbound_interface']:
if config['inbound_interface']['name'] not in 'any' and config['inbound_interface']['name'] not in interfaces():
- Warning(f'{err_msg} - interface "{config["inbound_interface"]["name"]}" does not exist on this system')
+ Warning(f'NAT interface "{config["inbound_interface"]["name"]}" for destination NAT rule "{rule}" does not exist!')
if not dict_search('translation.address', config) and not dict_search('translation.port', config) and 'redirect' not in config['translation']:
if 'exclude' not in config and 'backend' not in config['load_balance']:
@@ -193,8 +191,7 @@ def verify(nat):
err_msg = f'Static NAT configuration error in rule {rule}:'
if 'inbound_interface' not in config:
- raise ConfigError(f'{err_msg}\n' \
- 'inbound-interface not specified')
+ raise ConfigError(f'{err_msg} inbound-interface not specified')
# common rule verification
verify_rule(config, err_msg, nat['firewall_group'])
diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py
index dee1551fe..4c1ead258 100755
--- a/src/conf_mode/nat66.py
+++ b/src/conf_mode/nat66.py
@@ -64,10 +64,10 @@ def verify(nat):
if 'outbound_interface' in config:
if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']:
- raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for nat source rule "{rule}"')
+ raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"')
elif 'name' in config['outbound_interface']:
if config['outbound_interface']['name'] not in 'any' and config['outbound_interface']['name'] not in interfaces():
- Warning(f'{err_msg} - interface "{config["outbound_interface"]["name"]}" does not exist on this system')
+ Warning(f'NAT66 interface "{config["outbound_interface"]["name"]}" for source NAT66 rule "{rule}" does not exist!')
addr = dict_search('translation.address', config)
if addr != None:
@@ -88,10 +88,10 @@ def verify(nat):
if 'inbound_interface' in config:
if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']:
- raise ConfigError(f'{err_msg} - Cannot specify both interface group and interface name for destination nat rule "{rule}"')
+ raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"')
elif 'name' in config['inbound_interface']:
if config['inbound_interface']['name'] not in 'any' and config['inbound_interface']['name'] not in interfaces():
- Warning(f'{err_msg} - interface "{config["inbound_interface"]["name"]}" does not exist on this system')
+ Warning(f'NAT66 interface "{config["inbound_interface"]["name"]}" for destination NAT66 rule "{rule}" does not exist!')
return None
diff --git a/src/tests/test_jinja_filters.py b/src/tests/test_jinja_filters.py
deleted file mode 100644
index 8a7241fe3..000000000
--- a/src/tests/test_jinja_filters.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-from unittest import TestCase
-
-from ipaddress import ip_network
-from vyos.template import address_from_cidr
-from vyos.template import netmask_from_cidr
-from vyos.template import is_ipv4
-from vyos.template import is_ipv6
-from vyos.template import first_host_address
-from vyos.template import last_host_address
-from vyos.template import inc_ip
-
-class TestTeamplteHelpers(TestCase):
- def setUp(self):
- pass
-
- def test_helpers_from_cidr(self):
- network_v4 = '192.0.2.0/26'
- self.assertEqual(address_from_cidr(network_v4), str(ip_network(network_v4).network_address))
- self.assertEqual(netmask_from_cidr(network_v4), str(ip_network(network_v4).netmask))
-
- def test_helpers_ipv4(self):
- self.assertTrue(is_ipv4('192.0.2.1'))
- self.assertTrue(is_ipv4('192.0.2.0/24'))
- self.assertTrue(is_ipv4('192.0.2.1/32'))
- self.assertTrue(is_ipv4('10.255.1.2'))
- self.assertTrue(is_ipv4('10.255.1.0/24'))
- self.assertTrue(is_ipv4('10.255.1.2/32'))
- self.assertFalse(is_ipv4('2001:db8::'))
- self.assertFalse(is_ipv4('2001:db8::1'))
- self.assertFalse(is_ipv4('2001:db8::/64'))
-
- def test_helpers_ipv6(self):
- self.assertFalse(is_ipv6('192.0.2.1'))
- self.assertFalse(is_ipv6('192.0.2.0/24'))
- self.assertFalse(is_ipv6('192.0.2.1/32'))
- self.assertFalse(is_ipv6('10.255.1.2'))
- self.assertFalse(is_ipv6('10.255.1.0/24'))
- self.assertFalse(is_ipv6('10.255.1.2/32'))
- self.assertTrue(is_ipv6('2001:db8::'))
- self.assertTrue(is_ipv6('2001:db8::1'))
- self.assertTrue(is_ipv6('2001:db8::1/64'))
- self.assertTrue(is_ipv6('2001:db8::/32'))
- self.assertTrue(is_ipv6('2001:db8::/64'))
-
- def test_helpers_first_host_address(self):
- self.assertEqual(first_host_address('10.0.0.0/24'), '10.0.0.1')
- self.assertEqual(first_host_address('10.0.0.128/25'), '10.0.0.129')
- self.assertEqual(first_host_address('10.0.0.200/29'), '10.0.0.201')
-
- self.assertEqual(first_host_address('2001:db8::/64'), '2001:db8::')
- self.assertEqual(first_host_address('2001:db8::/112'), '2001:db8::')
- self.assertEqual(first_host_address('2001:db8::10/112'), '2001:db8::10')
- self.assertEqual(first_host_address('2001:db8::100/112'), '2001:db8::100')
diff --git a/src/tests/test_template.py b/src/tests/test_template.py
index 2d065f545..aba97015e 100644
--- a/src/tests/test_template.py
+++ b/src/tests/test_template.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -17,6 +17,7 @@
import os
import vyos.template
+from ipaddress import ip_network
from unittest import TestCase
class TestVyOSTemplate(TestCase):
@@ -67,6 +68,9 @@ class TestVyOSTemplate(TestCase):
# ValueError: 2001:db8::1/48 has host bits set
self.assertEqual(vyos.template.address_from_cidr('2001:db8::1/48'), '2001:db8::1')
+ network_v4 = '192.0.2.0/26'
+ self.assertEqual(vyos.template.address_from_cidr(network_v4), str(ip_network(network_v4).network_address))
+
def test_netmask_from_cidr(self):
self.assertEqual(vyos.template.netmask_from_cidr('192.0.2.0/24'), '255.255.255.0')
self.assertEqual(vyos.template.netmask_from_cidr('192.0.2.128/25'), '255.255.255.128')
@@ -80,28 +84,35 @@ class TestVyOSTemplate(TestCase):
# ValueError: 2001:db8:1:/64 has host bits set
self.assertEqual(vyos.template.netmask_from_cidr('2001:db8:1:/64'), 'ffff:ffff:ffff:ffff::')
+ network_v4 = '192.0.2.0/26'
+ self.assertEqual(vyos.template.netmask_from_cidr(network_v4), str(ip_network(network_v4).netmask))
+
def test_first_host_address(self):
- self.assertEqual(vyos.template.first_host_address('10.0.0.0/24'), '10.0.0.1')
- self.assertEqual(vyos.template.first_host_address('10.0.0.128/25'), '10.0.0.129')
- self.assertEqual(vyos.template.first_host_address('2001:db8::/64'), '2001:db8::')
+ self.assertEqual(vyos.template.first_host_address('10.0.0.0/24'), '10.0.0.1')
+ self.assertEqual(vyos.template.first_host_address('10.0.0.10/24'), '10.0.0.1')
+ self.assertEqual(vyos.template.first_host_address('10.0.0.255/24'), '10.0.0.1')
+ self.assertEqual(vyos.template.first_host_address('10.0.0.128/25'), '10.0.0.129')
+ self.assertEqual(vyos.template.first_host_address('2001:db8::/64'), '2001:db8::1')
+ self.assertEqual(vyos.template.first_host_address('2001:db8::1000/64'), '2001:db8::1')
+ self.assertEqual(vyos.template.first_host_address('2001:db8::ffff:ffff:ffff:ffff/64'), '2001:db8::1')
def test_last_host_address(self):
- self.assertEqual(vyos.template.last_host_address('10.0.0.0/24'), '10.0.0.254')
- self.assertEqual(vyos.template.last_host_address('10.0.0.128/25'), '10.0.0.254')
- self.assertEqual(vyos.template.last_host_address('2001:db8::/64'), '2001:db8::ffff:ffff:ffff:ffff')
+ self.assertEqual(vyos.template.last_host_address('10.0.0.0/24'), '10.0.0.254')
+ self.assertEqual(vyos.template.last_host_address('10.0.0.128/25'), '10.0.0.254')
+ self.assertEqual(vyos.template.last_host_address('2001:db8::/64'), '2001:db8::ffff:ffff:ffff:ffff')
def test_increment_ip(self):
- self.assertEqual(vyos.template.inc_ip('10.0.0.0/24', '2'), '10.0.0.2')
- self.assertEqual(vyos.template.inc_ip('10.0.0.0', '2'), '10.0.0.2')
- self.assertEqual(vyos.template.inc_ip('10.0.0.0', '10'), '10.0.0.10')
- self.assertEqual(vyos.template.inc_ip('2001:db8::/64', '2'), '2001:db8::2')
- self.assertEqual(vyos.template.inc_ip('2001:db8::', '10'), '2001:db8::a')
+ self.assertEqual(vyos.template.inc_ip('10.0.0.0/24', '2'), '10.0.0.2')
+ self.assertEqual(vyos.template.inc_ip('10.0.0.0', '2'), '10.0.0.2')
+ self.assertEqual(vyos.template.inc_ip('10.0.0.0', '10'), '10.0.0.10')
+ self.assertEqual(vyos.template.inc_ip('2001:db8::/64', '2'), '2001:db8::2')
+ self.assertEqual(vyos.template.inc_ip('2001:db8::', '10'), '2001:db8::a')
def test_decrement_ip(self):
- self.assertEqual(vyos.template.dec_ip('10.0.0.100/24', '1'), '10.0.0.99')
- self.assertEqual(vyos.template.dec_ip('10.0.0.90', '10'), '10.0.0.80')
- self.assertEqual(vyos.template.dec_ip('2001:db8::b/64', '10'), '2001:db8::1')
- self.assertEqual(vyos.template.dec_ip('2001:db8::f', '5'), '2001:db8::a')
+ self.assertEqual(vyos.template.dec_ip('10.0.0.100/24', '1'), '10.0.0.99')
+ self.assertEqual(vyos.template.dec_ip('10.0.0.90', '10'), '10.0.0.80')
+ self.assertEqual(vyos.template.dec_ip('2001:db8::b/64', '10'), '2001:db8::1')
+ self.assertEqual(vyos.template.dec_ip('2001:db8::f', '5'), '2001:db8::a')
def test_is_network(self):
self.assertFalse(vyos.template.is_ip_network('192.0.2.0'))
@@ -181,4 +192,3 @@ class TestVyOSTemplate(TestCase):
for group_name, group_config in data['ike_group'].items():
ciphers = vyos.template.get_esp_ike_cipher(group_config)
self.assertIn(IKEv2_DEFAULT, ','.join(ciphers))
-