summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_accel_ppp_test.py28
-rwxr-xr-xsmoketest/scripts/cli/test_firewall.py39
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_ethernet.py12
-rwxr-xr-xsmoketest/scripts/cli/test_load_balancning_wan.py1
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py141
-rwxr-xr-xsmoketest/scripts/cli/test_nat66.py63
-rwxr-xr-xsmoketest/scripts/cli/test_policy.py198
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_nhrp.py2
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospf.py62
-rwxr-xr-xsmoketest/scripts/cli/test_service_ids.py6
-rwxr-xr-xsmoketest/scripts/cli/test_service_ipoe-server.py91
-rwxr-xr-xsmoketest/scripts/cli/test_service_pppoe-server.py24
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_sstp.py54
13 files changed, 509 insertions, 212 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py
index b2acb03cc..471bdaffb 100644
--- a/smoketest/scripts/cli/base_accel_ppp_test.py
+++ b/smoketest/scripts/cli/base_accel_ppp_test.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -27,6 +27,17 @@ from vyos.util import process_named_running
class BasicAccelPPPTest:
class TestCase(VyOSUnitTestSHIM.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls._process_name = 'accel-pppd'
+
+ super(BasicAccelPPPTest.TestCase, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, cls._base_path)
+
def setUp(self):
self._gateway = '192.0.2.1'
# ensure we can also run this test on a live system - so lets clean
@@ -34,9 +45,15 @@ class BasicAccelPPPTest:
self.cli_delete(self._base_path)
def tearDown(self):
+ # Check for running process
+ self.assertTrue(process_named_running(self._process_name))
+
self.cli_delete(self._base_path)
self.cli_commit()
+ # Check for running process
+ self.assertFalse(process_named_running(self._process_name))
+
def set(self, path):
self.cli_set(self._base_path + path)
@@ -113,9 +130,6 @@ class BasicAccelPPPTest:
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
- # Check for running process
- self.assertTrue(process_named_running(self._process_name))
-
# Check local-users default value(s)
self.delete(['authentication', 'local-users', 'username', user, 'static-ip'])
# commit changes
@@ -127,9 +141,6 @@ class BasicAccelPPPTest:
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
- # Check for running process
- self.assertTrue(process_named_running(self._process_name))
-
def test_accel_radius_authentication(self):
# Test configuration of RADIUS authentication for PPPoE server
self.basic_config()
@@ -186,9 +197,6 @@ class BasicAccelPPPTest:
self.assertEqual(f'req-limit=0', server[4])
self.assertEqual(f'fail-time=0', server[5])
- # Check for running process
- self.assertTrue(process_named_running(self._process_name))
-
#
# Disable Radius Accounting
#
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py
index 8e4aac788..821925bcd 100755
--- a/smoketest/scripts/cli/test_firewall.py
+++ b/smoketest/scripts/cli/test_firewall.py
@@ -209,6 +209,10 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'name', name, 'rule', '5', 'protocol', 'tcp'])
self.cli_set(['firewall', 'name', name, 'rule', '5', 'tcp', 'flags', 'syn'])
self.cli_set(['firewall', 'name', name, 'rule', '5', 'tcp', 'mss', mss_range])
+ self.cli_set(['firewall', 'name', name, 'rule', '5', 'inbound-interface', interface])
+ self.cli_set(['firewall', 'name', name, 'rule', '6', 'action', 'return'])
+ self.cli_set(['firewall', 'name', name, 'rule', '6', 'protocol', 'gre'])
+ self.cli_set(['firewall', 'name', name, 'rule', '6', 'outbound-interface', interface])
self.cli_set(['firewall', 'interface', interface, 'in', 'name', name])
@@ -221,13 +225,15 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
['tcp dport 22', 'limit rate 5/minute', 'return'],
['log prefix "[smoketest-default-D]"','smoketest default-action', 'drop'],
['tcp dport 22', 'add @RECENT_smoketest_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'drop'],
- ['tcp flags & syn == syn', f'tcp option maxseg size {mss_range}'],
+ ['tcp flags & syn == syn', f'tcp option maxseg size {mss_range}', f'iifname "{interface}"'],
+ ['meta l4proto gre', f'oifname "{interface}"', 'return']
]
self.verify_nftables(nftables_search, 'ip vyos_filter')
def test_ipv4_advanced(self):
name = 'smoketest-adv'
+ name2 = 'smoketest-adv2'
interface = 'eth0'
self.cli_set(['firewall', 'name', name, 'default-action', 'drop'])
@@ -246,6 +252,13 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'name', name, 'rule', '7', 'dscp', '3-11'])
self.cli_set(['firewall', 'name', name, 'rule', '7', 'dscp-exclude', '21-25'])
+ self.cli_set(['firewall', 'name', name2, 'default-action', 'jump'])
+ self.cli_set(['firewall', 'name', name2, 'default-jump-target', name])
+ self.cli_set(['firewall', 'name', name2, 'enable-default-log'])
+ self.cli_set(['firewall', 'name', name2, 'rule', '1', 'source', 'address', '198.51.100.1'])
+ self.cli_set(['firewall', 'name', name2, 'rule', '1', 'action', 'jump'])
+ self.cli_set(['firewall', 'name', name2, 'rule', '1', 'jump-target', name])
+
self.cli_set(['firewall', 'interface', interface, 'in', 'name', name])
self.cli_commit()
@@ -254,7 +267,9 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
[f'iifname "{interface}"', f'jump NAME_{name}'],
['ip length { 64, 512, 1024 }', 'ip dscp { 0x11, 0x34 }', 'return'],
['ip length 1-30000', 'ip length != 60000-65535', 'ip dscp 0x03-0x0b', 'ip dscp != 0x15-0x19', 'return'],
- [f'log prefix "[{name}-default-D]"', 'drop']
+ [f'log prefix "[{name}-default-D]"', 'drop'],
+ ['ip saddr 198.51.100.1', f'jump NAME_{name}'],
+ [f'log prefix "[{name2}-default-J]"', f'jump NAME_{name}']
]
self.verify_nftables(nftables_search, 'ip vyos_filter')
@@ -275,6 +290,11 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'action', 'reject'])
self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'protocol', 'tcp_udp'])
self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'destination', 'port', '8888'])
+ self.cli_set(['firewall', 'ipv6-name', name, 'rule', '2', 'inbound-interface', interface])
+
+ self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'action', 'return'])
+ self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'protocol', 'gre'])
+ self.cli_set(['firewall', 'ipv6-name', name, 'rule', '3', 'outbound-interface', interface])
self.cli_set(['firewall', 'interface', interface, 'in', 'ipv6-name', name])
@@ -283,7 +303,8 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
nftables_search = [
[f'iifname "{interface}"', f'jump NAME6_{name}'],
['saddr 2002::1', 'daddr 2002::1:1', 'log prefix "[v6-smoketest-1-A]" level crit', 'return'],
- ['meta l4proto { tcp, udp }', 'th dport 8888', 'reject'],
+ ['meta l4proto { tcp, udp }', 'th dport 8888', f'iifname "{interface}"', 'reject'],
+ ['meta l4proto gre', f'oifname "{interface}"', 'return'],
['smoketest default-action', f'log prefix "[{name}-default-D]"', 'drop']
]
@@ -291,6 +312,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
def test_ipv6_advanced(self):
name = 'v6-smoketest-adv'
+ name2 = 'v6-smoketest-adv2'
interface = 'eth0'
self.cli_set(['firewall', 'ipv6-name', name, 'default-action', 'drop'])
@@ -309,6 +331,13 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'dscp', '4-14'])
self.cli_set(['firewall', 'ipv6-name', name, 'rule', '4', 'dscp-exclude', '31-35'])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'default-action', 'jump'])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'default-jump-target', name])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'enable-default-log'])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'source', 'address', '2001:db8::/64'])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'action', 'jump'])
+ self.cli_set(['firewall', 'ipv6-name', name2, 'rule', '1', 'jump-target', name])
+
self.cli_set(['firewall', 'interface', interface, 'in', 'ipv6-name', name])
self.cli_commit()
@@ -317,7 +346,9 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
[f'iifname "{interface}"', f'jump NAME6_{name}'],
['ip6 length { 65, 513, 1025 }', 'ip6 dscp { af21, 0x35 }', 'return'],
['ip6 length 1-1999', 'ip6 length != 60000-65535', 'ip6 dscp 0x04-0x0e', 'ip6 dscp != 0x1f-0x23', 'return'],
- [f'log prefix "[{name}-default-D]"', 'drop']
+ [f'log prefix "[{name}-default-D]"', 'drop'],
+ ['ip6 saddr 2001:db8::/64', f'jump NAME6_{name}'],
+ [f'log prefix "[{name2}-default-J]"', f'jump NAME6_{name}']
]
self.verify_nftables(nftables_search, 'ip6 vyos_filter')
diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py
index 5049bd5b0..ed611062a 100755
--- a/smoketest/scripts/cli/test_interfaces_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_ethernet.py
@@ -120,15 +120,13 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase):
cls._base_path = ['interfaces', 'ethernet']
cls._mirror_interfaces = ['dum21354']
- # we need to filter out VLAN interfaces identified by a dot (.)
- # in their name - just in case!
+ # We only test on physical interfaces and not VLAN (sub-)interfaces
if 'TEST_ETH' in os.environ:
tmp = os.environ['TEST_ETH'].split()
cls._interfaces = tmp
else:
- for tmp in Section.interfaces('ethernet'):
- if not '.' in tmp:
- cls._interfaces.append(tmp)
+ for tmp in Section.interfaces('ethernet', vlan=False):
+ cls._interfaces.append(tmp)
cls._macs = {}
for interface in cls._interfaces:
@@ -205,7 +203,6 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase):
tmp = read_file(f'/proc/sys/net/core/rps_sock_flow_entries')
self.assertEqual(int(tmp), global_rfs_flow)
-
# delete configuration of RFS and check all values returned to default "0"
for interface in self._interfaces:
self.cli_delete(self._base_path + [interface, 'offload', 'rfs'])
@@ -219,9 +216,6 @@ class EthernetInterfaceTest(BasicInterfaceTest.TestCase):
tmp = read_file(f'/sys/class/net/{interface}/queues/rx-{i}/rps_flow_cnt')
self.assertEqual(int(tmp), 0)
- tmp = read_file(f'/proc/sys/net/core/rps_sock_flow_entries')
- self.assertEqual(int(tmp), 0)
-
def test_non_existing_interface(self):
unknonw_interface = self._base_path + ['eth667']
diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py
index 303dece86..23020b9b1 100755
--- a/smoketest/scripts/cli/test_load_balancning_wan.py
+++ b/smoketest/scripts/cli/test_load_balancning_wan.py
@@ -177,6 +177,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
}"""
nat_vyos_pre_snat_hook = """table ip nat {
chain VYOS_PRE_SNAT_HOOK {
+ type nat hook postrouting priority srcnat - 1; policy accept;
counter jump WANLOADBALANCE
return
}
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index 408facfb3..f824838c0 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -26,6 +26,7 @@ from vyos.util import dict_search
base_path = ['nat']
src_path = base_path + ['source']
dst_path = base_path + ['destination']
+static_path = base_path + ['static']
class TestNAT(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -40,10 +41,24 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
+ def verify_nftables(self, nftables_search, table, inverse=False, args=''):
+ nftables_output = cmd(f'sudo nft {args} list table {table}')
+
+ for search in nftables_search:
+ matched = False
+ for line in nftables_output.split("\n"):
+ if all(item in line for item in search):
+ matched = True
+ break
+ self.assertTrue(not matched if inverse else matched, msg=search)
+
def test_snat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
outbound_iface_100 = 'eth0'
outbound_iface_200 = 'eth1'
+
+ nftables_search = ['jump VYOS_PRE_SNAT_HOOK']
+
for rule in rules:
network = f'192.168.{rule}.0/24'
# depending of rule order we check either for source address for NAT
@@ -52,51 +67,16 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_set(src_path + ['rule', rule, 'source', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade'])
else:
self.cli_set(src_path + ['rule', rule, 'destination', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
self.cli_set(src_path + ['rule', rule, 'exclude'])
+ nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return'])
self.cli_commit()
- tmp = cmd('sudo nft -j list chain ip nat POSTROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK')
- else:
- rule = str(rules[idx - 1])
- network = f'192.168.{rule}.0/24'
-
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['comment'], f'SRC-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- address = dict_search('match.right.prefix.addr', data['expr'][1])
- mask = dict_search('match.right.prefix.len', data['expr'][1])
-
- if int(rule) < 200:
- self.assertEqual(direction, 'saddr')
- self.assertEqual(iface, outbound_iface_100)
- # check for masquerade keyword
- self.assertIn('masquerade', data['expr'][3])
- else:
- self.assertEqual(direction, 'daddr')
- self.assertEqual(iface, outbound_iface_200)
- # check for return keyword due to 'exclude'
- self.assertIn('return', data['expr'][3])
-
- self.assertEqual(f'{address}/{mask}', network)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_dnat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
@@ -105,56 +85,29 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
inbound_proto_100 = 'udp'
inbound_proto_200 = 'tcp'
+ nftables_search = ['jump VYOS_PRE_DNAT_HOOK']
+
for rule in rules:
port = f'10{rule}'
self.cli_set(dst_path + ['rule', rule, 'source', 'port', port])
self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port])
+ rule_search = [f'dnat to 192.0.2.1:{port}']
if int(rule) < 200:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ rule_search.append(f'{inbound_proto_100} sport {port}')
+ rule_search.append(f'iifname "{inbound_iface_100}"')
else:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ rule_search.append(f'iifname "{inbound_iface_200}"')
- self.cli_commit()
-
- tmp = cmd('sudo nft -j list chain ip nat PREROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
+ nftables_search.append(rule_search)
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK')
- else:
+ self.cli_commit()
- rule = str(rules[idx - 1])
- port = int(f'10{rule}')
-
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- protocol = dict_search('match.left.payload.protocol', data['expr'][1])
- dnat_addr = dict_search('dnat.addr', data['expr'][3])
- dnat_port = dict_search('dnat.port', data['expr'][3])
-
- self.assertEqual(direction, 'sport')
- self.assertEqual(dnat_addr, '192.0.2.1')
- self.assertEqual(dnat_port, port)
- if int(rule) < 200:
- self.assertEqual(iface, inbound_iface_100)
- self.assertEqual(protocol, inbound_proto_100)
- else:
- self.assertEqual(iface, inbound_iface_200)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
@@ -193,8 +146,48 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
# without any rule
self.cli_set(src_path)
self.cli_set(dst_path)
+ self.cli_set(static_path)
+ self.cli_commit()
+
+ def test_dnat_without_translation_address(self):
+ self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
+ self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443'])
+ self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp'])
+ self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443'])
+
+ self.cli_commit()
+
+ nftables_search = [
+ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
+
+ def test_static_nat(self):
+ dst_addr_1 = '10.0.1.1'
+ translate_addr_1 = '192.168.1.1'
+ dst_addr_2 = '203.0.113.0/24'
+ translate_addr_2 = '192.0.2.0/24'
+ ifname = 'eth0'
+
+ self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1])
+ self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1])
+
+ self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2])
+ self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2])
+
self.cli_commit()
+ nftables_search = [
+ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'],
+ [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'],
+ [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'],
+ [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_static_nat')
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py
index 537b094a4..6cf7ca0a1 100755
--- a/smoketest/scripts/cli/test_nat66.py
+++ b/smoketest/scripts/cli/test_nat66.py
@@ -71,12 +71,12 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
nftables_search = [
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'snat prefix to fc01::/64'],
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'masquerade'],
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'return']
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'],
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'],
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_source_nat66_address(self):
source_prefix = 'fc00::/64'
@@ -88,25 +88,11 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
# check validate() - outbound-interface must be defined
self.cli_commit()
- tmp = cmd('sudo nft -j list table ip6 nat')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
-
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['family'], 'ip6')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- address = dict_search('match.right.prefix.addr', data['expr'][2])
- mask = dict_search('match.right.prefix.len', data['expr'][2])
- snat_address = dict_search('snat.addr', data['expr'][3])
+ nftables_search = [
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}']
+ ]
- self.assertEqual(iface, 'eth1')
- # check for translation address
- self.assertEqual(snat_address, translation_address)
- self.assertEqual(f'{address}/{mask}', source_prefix)
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66(self):
destination_address = 'fc00::1'
@@ -129,7 +115,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66_protocol(self):
translation_address = '2001:db8:1111::1'
@@ -153,7 +139,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to 2001:db8:1111::1:5555']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66_prefix(self):
destination_prefix = 'fc00::/64'
@@ -165,22 +151,25 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
# check validate() - outbound-interface must be defined
self.cli_commit()
- tmp = cmd('sudo nft -j list table ip6 nat')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
+ nftables_search = [
+ ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
- for idx in range(0, len(data_json)):
- data = data_json[idx]
+ def test_destination_nat66_without_translation_address(self):
+ self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
+ self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443'])
+ self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp'])
+ self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443'])
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['family'], 'ip6')
- self.assertEqual(data['table'], 'nat')
+ self.cli_commit()
- iface = dict_search('match.right', data['expr'][0])
- translation_address = dict_search('dnat.addr.prefix.addr', data['expr'][3])
- translation_mask = dict_search('dnat.addr.prefix.len', data['expr'][3])
+ nftables_search = [
+ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443']
+ ]
- self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix)
- self.assertEqual(iface, 'eth1')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_source_nat66_required_translation_prefix(self):
# T2813: Ensure translation address is specified
@@ -222,7 +211,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to 2001:db8:1111::1:80']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_nat66_no_rules(self):
# T3206: deleting all rules but keep the direction 'destination' or
diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py
index 3d37d22ae..2166e63ec 100755
--- a/smoketest/scripts/cli/test_policy.py
+++ b/smoketest/scripts/cli/test_policy.py
@@ -698,6 +698,184 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
for rule in test_range:
tmp = f'ip prefix-list {prefix_list} seq {rule} permit {prefix} le {rule}'
self.assertIn(tmp, config)
+ def test_route_map_community_set(self):
+ test_data = {
+ "community-configuration": {
+ "rule": {
+ "10": {
+ "action": "permit",
+ "set": {
+ "community": {
+ "replace": [
+ "65000:10",
+ "65001:11"
+ ]
+ },
+ "extcommunity": {
+ "bandwidth": "200",
+ "rt": [
+ "65000:10",
+ "192.168.0.1:11"
+ ],
+ "soo": [
+ "192.168.0.1:11",
+ "65000:10"
+ ]
+ },
+ "large-community": {
+ "replace": [
+ "65000:65000:10",
+ "65000:65000:11"
+ ]
+ }
+ }
+ },
+ "20": {
+ "action": "permit",
+ "set": {
+ "community": {
+ "add": [
+ "65000:10",
+ "65001:11"
+ ]
+ },
+ "extcommunity": {
+ "bandwidth": "200",
+ "bandwidth-non-transitive": {}
+ },
+ "large-community": {
+ "add": [
+ "65000:65000:10",
+ "65000:65000:11"
+ ]
+ }
+ }
+ },
+ "30": {
+ "action": "permit",
+ "set": {
+ "community": {
+ "none": {}
+ },
+ "extcommunity": {
+ "none": {}
+ },
+ "large-community": {
+ "none": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ for route_map, route_map_config in test_data.items():
+ path = base_path + ['route-map', route_map]
+ self.cli_set(path + ['description', f'VyOS ROUTE-MAP {route_map}'])
+ if 'rule' not in route_map_config:
+ continue
+
+ for rule, rule_config in route_map_config['rule'].items():
+ if 'action' in rule_config:
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
+ if 'set' in rule_config:
+
+ #Add community in configuration
+ if 'community' in rule_config['set']:
+ if 'none' in rule_config['set']['community']:
+ self.cli_set(path + ['rule', rule, 'set', 'community', 'none'])
+ else:
+ community_path = path + ['rule', rule, 'set', 'community']
+ if 'add' in rule_config['set']['community']:
+ for community_unit in rule_config['set']['community']['add']:
+ self.cli_set(community_path + ['add', community_unit])
+ if 'replace' in rule_config['set']['community']:
+ for community_unit in rule_config['set']['community']['replace']:
+ self.cli_set(community_path + ['replace', community_unit])
+
+ #Add large-community in configuration
+ if 'large-community' in rule_config['set']:
+ if 'none' in rule_config['set']['large-community']:
+ self.cli_set(path + ['rule', rule, 'set', 'large-community', 'none'])
+ else:
+ community_path = path + ['rule', rule, 'set', 'large-community']
+ if 'add' in rule_config['set']['large-community']:
+ for community_unit in rule_config['set']['large-community']['add']:
+ self.cli_set(community_path + ['add', community_unit])
+ if 'replace' in rule_config['set']['large-community']:
+ for community_unit in rule_config['set']['large-community']['replace']:
+ self.cli_set(community_path + ['replace', community_unit])
+
+ #Add extcommunity in configuration
+ if 'extcommunity' in rule_config['set']:
+ if 'none' in rule_config['set']['extcommunity']:
+ self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'none'])
+ else:
+ if 'bandwidth' in rule_config['set']['extcommunity']:
+ self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'bandwidth', rule_config['set']['extcommunity']['bandwidth']])
+ if 'bandwidth-non-transitive' in rule_config['set']['extcommunity']:
+ self.cli_set(path + ['rule', rule, 'set','extcommunity', 'bandwidth-non-transitive'])
+ if 'rt' in rule_config['set']['extcommunity']:
+ for community_unit in rule_config['set']['extcommunity']['rt']:
+ self.cli_set(path + ['rule', rule, 'set', 'extcommunity','rt',community_unit])
+ if 'soo' in rule_config['set']['extcommunity']:
+ for community_unit in rule_config['set']['extcommunity']['soo']:
+ self.cli_set(path + ['rule', rule, 'set', 'extcommunity','soo',community_unit])
+ self.cli_commit()
+
+ for route_map, route_map_config in test_data.items():
+ if 'rule' not in route_map_config:
+ continue
+ for rule, rule_config in route_map_config['rule'].items():
+ name = f'route-map {route_map} {rule_config["action"]} {rule}'
+ config = self.getFRRconfig(name)
+ self.assertIn(name, config)
+
+ if 'set' in rule_config:
+ #Check community
+ if 'community' in rule_config['set']:
+ if 'none' in rule_config['set']['community']:
+ tmp = f'set community none'
+ self.assertIn(tmp, config)
+ if 'replace' in rule_config['set']['community']:
+ values = ' '.join(rule_config['set']['community']['replace'])
+ tmp = f'set community {values}'
+ self.assertIn(tmp, config)
+ if 'add' in rule_config['set']['community']:
+ values = ' '.join(rule_config['set']['community']['add'])
+ tmp = f'set community {values} additive'
+ self.assertIn(tmp, config)
+ #Check large-community
+ if 'large-community' in rule_config['set']:
+ if 'none' in rule_config['set']['large-community']:
+ tmp = f'set large-community none'
+ self.assertIn(tmp, config)
+ if 'replace' in rule_config['set']['large-community']:
+ values = ' '.join(rule_config['set']['large-community']['replace'])
+ tmp = f'set large-community {values}'
+ self.assertIn(tmp, config)
+ if 'add' in rule_config['set']['large-community']:
+ values = ' '.join(rule_config['set']['large-community']['add'])
+ tmp = f'set large-community {values} additive'
+ self.assertIn(tmp, config)
+ #Check extcommunity
+ if 'extcommunity' in rule_config['set']:
+ if 'none' in rule_config['set']['extcommunity']:
+ tmp = 'set extcommunity none'
+ self.assertIn(tmp, config)
+ if 'bandwidth' in rule_config['set']['extcommunity']:
+ values = rule_config['set']['extcommunity']['bandwidth']
+ tmp = f'set extcommunity bandwidth {values}'
+ if 'bandwidth-non-transitive' in rule_config['set']['extcommunity']:
+ tmp = tmp + ' non-transitive'
+ self.assertIn(tmp, config)
+ if 'rt' in rule_config['set']['extcommunity']:
+ values = ' '.join(rule_config['set']['extcommunity']['rt'])
+ tmp = f'set extcommunity rt {values}'
+ self.assertIn(tmp, config)
+ if 'soo' in rule_config['set']['extcommunity']:
+ values = ' '.join(rule_config['set']['extcommunity']['soo'])
+ tmp = f'set extcommunity soo {values}'
+ self.assertIn(tmp, config)
def test_route_map(self):
access_list = '50'
@@ -845,13 +1023,9 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
'as-path-prepend-last-as' : '5',
'atomic-aggregate' : '',
'distance' : '110',
- 'extcommunity-bw' : '20000',
- 'extcommunity-rt' : '123:456',
- 'extcommunity-soo' : '456:789',
'ipv6-next-hop-global' : '2001::1',
'ipv6-next-hop-local' : 'fe80::1',
'ip-next-hop' : '192.168.1.1',
- 'large-community' : '100:200:300',
'local-preference' : '500',
'metric' : '150',
'metric-type' : 'type-1',
@@ -1049,20 +1223,12 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
self.cli_set(path + ['rule', rule, 'set', 'atomic-aggregate'])
if 'distance' in rule_config['set']:
self.cli_set(path + ['rule', rule, 'set', 'distance', rule_config['set']['distance']])
- if 'extcommunity-bw' in rule_config['set']:
- self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'bandwidth', rule_config['set']['extcommunity-bw']])
- if 'extcommunity-rt' in rule_config['set']:
- self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'rt', rule_config['set']['extcommunity-rt']])
- if 'extcommunity-soo' in rule_config['set']:
- self.cli_set(path + ['rule', rule, 'set', 'extcommunity', 'soo', rule_config['set']['extcommunity-soo']])
if 'ipv6-next-hop-global' in rule_config['set']:
self.cli_set(path + ['rule', rule, 'set', 'ipv6-next-hop', 'global', rule_config['set']['ipv6-next-hop-global']])
if 'ipv6-next-hop-local' in rule_config['set']:
self.cli_set(path + ['rule', rule, 'set', 'ipv6-next-hop', 'local', rule_config['set']['ipv6-next-hop-local']])
if 'ip-next-hop' in rule_config['set']:
self.cli_set(path + ['rule', rule, 'set', 'ip-next-hop', rule_config['set']['ip-next-hop']])
- if 'large-community' in rule_config['set']:
- self.cli_set(path + ['rule', rule, 'set', 'large-community', rule_config['set']['large-community']])
if 'local-preference' in rule_config['set']:
self.cli_set(path + ['rule', rule, 'set', 'local-preference', rule_config['set']['local-preference']])
if 'metric' in rule_config['set']:
@@ -1236,20 +1402,12 @@ class TestPolicy(VyOSUnitTestSHIM.TestCase):
tmp += 'atomic-aggregate'
elif 'distance' in rule_config['set']:
tmp += 'distance ' + rule_config['set']['distance']
- elif 'extcommunity-bw' in rule_config['set']:
- tmp += 'extcommunity bandwidth' + rule_config['set']['extcommunity-bw']
- elif 'extcommunity-rt' in rule_config['set']:
- tmp += 'extcommunity rt' + rule_config['set']['extcommunity-rt']
- elif 'extcommunity-soo' in rule_config['set']:
- tmp += 'extcommunity rt' + rule_config['set']['extcommunity-soo']
elif 'ip-next-hop' in rule_config['set']:
tmp += 'ip next-hop ' + rule_config['set']['ip-next-hop']
elif 'ipv6-next-hop-global' in rule_config['set']:
tmp += 'ipv6 next-hop global ' + rule_config['set']['ipv6-next-hop-global']
elif 'ipv6-next-hop-local' in rule_config['set']:
tmp += 'ipv6 next-hop local ' + rule_config['set']['ipv6-next-hop-local']
- elif 'large-community' in rule_config['set']:
- tmp += 'large-community ' + rule_config['set']['large-community']
elif 'local-preference' in rule_config['set']:
tmp += 'local-preference ' + rule_config['set']['local-preference']
elif 'metric' in rule_config['set']:
diff --git a/smoketest/scripts/cli/test_protocols_nhrp.py b/smoketest/scripts/cli/test_protocols_nhrp.py
index 9a00b86fc..59252875b 100755
--- a/smoketest/scripts/cli/test_protocols_nhrp.py
+++ b/smoketest/scripts/cli/test_protocols_nhrp.py
@@ -65,7 +65,6 @@ class TestProtocolsNHRP(VyOSUnitTestSHIM.TestCase):
self.cli_set(nhrp_path + ["tunnel", tunnel_if, "shortcut"])
# IKE/ESP Groups
- self.cli_set(vpn_path + ["esp-group", esp_group, "compression", "disable"])
self.cli_set(vpn_path + ["esp-group", esp_group, "lifetime", "1800"])
self.cli_set(vpn_path + ["esp-group", esp_group, "mode", "transport"])
self.cli_set(vpn_path + ["esp-group", esp_group, "pfs", "dh-group2"])
@@ -74,7 +73,6 @@ class TestProtocolsNHRP(VyOSUnitTestSHIM.TestCase):
self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "2", "encryption", "3des"])
self.cli_set(vpn_path + ["esp-group", esp_group, "proposal", "2", "hash", "md5"])
- self.cli_set(vpn_path + ["ike-group", ike_group, "ikev2-reauth", "no"])
self.cli_set(vpn_path + ["ike-group", ike_group, "key-exchange", "ikev1"])
self.cli_set(vpn_path + ["ike-group", ike_group, "lifetime", "3600"])
self.cli_set(vpn_path + ["ike-group", ike_group, "proposal", "1", "dh-group", "2"])
diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py
index e15ea478b..93bb761c1 100755
--- a/smoketest/scripts/cli/test_protocols_ospf.py
+++ b/smoketest/scripts/cli/test_protocols_ospf.py
@@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import logging
-import sys
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
@@ -23,15 +21,12 @@ from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
from vyos.util import process_named_running
-from vyos.util import cmd
PROCESS_NAME = 'ospfd'
base_path = ['protocols', 'ospf']
route_map = 'foo-bar-baz10'
-log = logging.getLogger('TestProtocolsOSPF')
-
class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
@@ -210,25 +205,14 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map])
self.cli_set(base_path + ['redistribute', protocol, 'metric-type', metric_type])
- # enable FRR debugging to find the root cause of failing testcases
- cmd('touch /tmp/vyos.frr.debug')
-
# commit changes
self.cli_commit()
- # disable FRR debugging
- cmd('rm -f /tmp/vyos.frr.debug')
-
# Verify FRR ospfd configuration
frrconfig = self.getFRRconfig('router ospf')
- try:
- self.assertIn(f'router ospf', frrconfig)
- for protocol in redistribute:
- self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig)
- except:
- log.debug(frrconfig)
- log.debug(cmd('sudo cat /tmp/vyos-configd-script-stdout'))
- self.fail('Now we can hopefully see why OSPF fails!')
+ self.assertIn(f'router ospf', frrconfig)
+ for protocol in redistribute:
+ self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig)
def test_ospf_08_virtual_link(self):
networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
@@ -396,6 +380,44 @@ class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase):
self.assertIn(f' network {network} area {area}', frrconfig)
self.assertIn(f' area {area} export-list {acl}', frrconfig)
+
+ def test_ospf_14_segment_routing_configuration(self):
+ global_block_low = "100"
+ global_block_high = "199"
+ local_block_low = "200"
+ local_block_high = "299"
+ interface = 'lo'
+ maximum_stack_size = '5'
+ prefix_one = '192.168.0.1/32'
+ prefix_two = '192.168.0.2/32'
+ prefix_one_value = '1'
+ prefix_two_value = '2'
+
+ self.cli_set(base_path + ['interface', interface])
+ self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size])
+ self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low])
+ self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high])
+ self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low])
+ self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high])
+ self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value])
+ self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null'])
+ self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value])
+ self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag'])
+
+ # Commit all changes
+ self.cli_commit()
+
+ # Verify all changes
+
+ frrconfig = self.getFRRconfig('router ospf')
+ self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', frrconfig)
+ self.assertIn(f' segment-routing node-msd {maximum_stack_size}', frrconfig)
+ self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', frrconfig)
+ self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', frrconfig)
+
+ self.skipTest('https://github.com/FRRouting/frr/issues/12007')
+ self.assertIn(f' segment-routing on', frrconfig)
+
+
if __name__ == '__main__':
- logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_ids.py b/smoketest/scripts/cli/test_service_ids.py
index d471eeaed..dcf2bcefe 100755
--- a/smoketest/scripts/cli/test_service_ids.py
+++ b/smoketest/scripts/cli/test_service_ids.py
@@ -77,9 +77,9 @@ class TestServiceIDS(VyOSUnitTestSHIM.TestCase):
self.cli_set(base_path + ['listen-interface', tmp])
self.cli_set(base_path + ['direction', 'in'])
- self.cli_set(base_path + ['threshold', 'fps', fps])
- self.cli_set(base_path + ['threshold', 'pps', pps])
- self.cli_set(base_path + ['threshold', 'mbps', mbps])
+ self.cli_set(base_path + ['threshold', 'general', 'fps', fps])
+ self.cli_set(base_path + ['threshold', 'general', 'pps', pps])
+ self.cli_set(base_path + ['threshold', 'general', 'mbps', mbps])
# commit changes
self.cli_commit()
diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py
new file mode 100755
index 000000000..bdab35834
--- /dev/null
+++ b/smoketest/scripts/cli/test_service_ipoe-server.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2022 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import unittest
+
+from base_accel_ppp_test import BasicAccelPPPTest
+from vyos.configsession import ConfigSessionError
+from vyos.util import cmd
+
+from configparser import ConfigParser
+
+ac_name = 'ACN'
+interface = 'eth0'
+
+class TestServiceIPoEServer(BasicAccelPPPTest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls._base_path = ['service', 'ipoe-server']
+ cls._config_file = '/run/accel-pppd/ipoe.conf'
+ cls._chap_secrets = '/run/accel-pppd/ipoe.chap-secrets'
+
+ # call base-classes classmethod
+ super(TestServiceIPoEServer, cls).setUpClass()
+
+ def verify(self, conf):
+ super().verify(conf)
+
+ # Validate configuration values
+ accel_modules = list(conf['modules'].keys())
+ self.assertIn('log_syslog', accel_modules)
+ self.assertIn('ipoe', accel_modules)
+ self.assertIn('shaper', accel_modules)
+ self.assertIn('ipv6pool', accel_modules)
+ self.assertIn('ipv6_nd', accel_modules)
+ self.assertIn('ipv6_dhcp', accel_modules)
+ self.assertIn('ippool', accel_modules)
+
+ def basic_config(self):
+ self.set(['interface', interface, 'client-subnet', '192.168.0.0/24'])
+
+ def test_accel_local_authentication(self):
+ mac_address = '08:00:27:2f:d8:06'
+ self.set(['authentication', 'interface', interface, 'mac', mac_address])
+ self.set(['authentication', 'mode', 'local'])
+
+ # No IPoE interface configured
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+
+ # Test configuration of local authentication for PPPoE server
+ self.basic_config()
+
+ # commit changes
+ self.cli_commit()
+
+ # Validate configuration values
+ conf = ConfigParser(allow_no_value=True, delimiters='=')
+ conf.read(self._config_file)
+
+ # check proper path to chap-secrets file
+ self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets)
+
+ accel_modules = list(conf['modules'].keys())
+ self.assertIn('chap-secrets', accel_modules)
+
+ # basic verification
+ self.verify(conf)
+
+ # check local users
+ tmp = cmd(f'sudo cat {self._chap_secrets}')
+ regex = f'{interface}\s+\*\s+{mac_address}\s+\*'
+ tmp = re.findall(regex, tmp)
+ self.assertTrue(tmp)
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
+
diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py
index fae16d3b3..7546c2e3d 100755
--- a/smoketest/scripts/cli/test_service_pppoe-server.py
+++ b/smoketest/scripts/cli/test_service_pppoe-server.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 020 VyOS maintainers and contributors
+# Copyright (C) 2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,14 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
from base_accel_ppp_test import BasicAccelPPPTest
from configparser import ConfigParser
-from vyos.configsession import ConfigSessionError
-from vyos.util import process_named_running
from vyos.util import read_file
from vyos.template import range_to_regex
@@ -30,24 +27,19 @@ ac_name = 'ACN'
interface = 'eth0'
class TestServicePPPoEServer(BasicAccelPPPTest.TestCase):
- def setUp(self):
- self._base_path = ['service', 'pppoe-server']
- self._process_name = 'accel-pppd'
- self._config_file = '/run/accel-pppd/pppoe.conf'
- self._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets'
+ @classmethod
+ def setUpClass(cls):
+ cls._base_path = ['service', 'pppoe-server']
+ cls._config_file = '/run/accel-pppd/pppoe.conf'
+ cls._chap_secrets = '/run/accel-pppd/pppoe.chap-secrets'
- super().setUp()
+ # call base-classes classmethod
+ super(TestServicePPPoEServer, cls).setUpClass()
def tearDown(self):
- # Check for running process
- self.assertTrue(process_named_running(self._process_name))
-
self.cli_delete(local_if)
super().tearDown()
- # Check for running process
- self.assertFalse(process_named_running(self._process_name))
-
def verify(self, conf):
mtu = '1492'
diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py
index f58920b5b..434e3aa05 100755
--- a/smoketest/scripts/cli/test_vpn_sstp.py
+++ b/smoketest/scripts/cli/test_vpn_sstp.py
@@ -19,29 +19,49 @@ import unittest
from base_accel_ppp_test import BasicAccelPPPTest
from vyos.util import read_file
-
pki_path = ['pki']
-cert_data = 'MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIwWTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIxMDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu+JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3LftzngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93+dm/LDnp7C0='
-key_data = 'MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww'
+
+cert_data = """
+MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIw
+WTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv
+bWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIx
+MDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNV
+BAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlP
+UzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE
+01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3
+QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8E
+BAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu
++JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3Lftz
+ngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93
++dm/LDnp7C0="""
+
+key_data = """
+MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx
+2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7
+u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww
+"""
class TestVPNSSTPServer(BasicAccelPPPTest.TestCase):
- def setUp(self):
- self._base_path = ['vpn', 'sstp']
- self._process_name = 'accel-pppd'
- self._config_file = '/run/accel-pppd/sstp.conf'
- self._chap_secrets = '/run/accel-pppd/sstp.chap-secrets'
- super().setUp()
+ @classmethod
+ def setUpClass(cls):
+ cls._base_path = ['vpn', 'sstp']
+ cls._config_file = '/run/accel-pppd/sstp.conf'
+ cls._chap_secrets = '/run/accel-pppd/sstp.chap-secrets'
- def tearDown(self):
- self.cli_delete(pki_path)
- super().tearDown()
+ # call base-classes classmethod
+ super(TestVPNSSTPServer, cls).setUpClass()
- def basic_config(self):
- self.cli_delete(pki_path)
- self.cli_set(pki_path + ['ca', 'sstp', 'certificate', cert_data])
- self.cli_set(pki_path + ['certificate', 'sstp', 'certificate', cert_data])
- self.cli_set(pki_path + ['certificate', 'sstp', 'private', 'key', key_data])
+ cls.cli_set(cls, pki_path + ['ca', 'sstp', 'certificate', cert_data.replace('\n','')])
+ cls.cli_set(cls, pki_path + ['certificate', 'sstp', 'certificate', cert_data.replace('\n','')])
+ cls.cli_set(cls, pki_path + ['certificate', 'sstp', 'private', 'key', key_data.replace('\n','')])
+ @classmethod
+ def tearDownClass(cls):
+ cls.cli_delete(cls, pki_path)
+
+ super(TestVPNSSTPServer, cls).tearDownClass()
+
+ def basic_config(self):
# SSL is mandatory
self.set(['ssl', 'ca-certificate', 'sstp'])
self.set(['ssl', 'certificate', 'sstp'])