summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rwxr-xr-xsmoketest/scripts/cli/test_firewall.py20
-rwxr-xr-xsmoketest/scripts/cli/test_ha_vrrp.py30
-rwxr-xr-xsmoketest/scripts/cli/test_policy_route.py34
-rwxr-xr-xsmoketest/scripts/cli/test_system_ntp.py60
4 files changed, 101 insertions, 43 deletions
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py
index 2b3b354ba..6b74e6c92 100755
--- a/smoketest/scripts/cli/test_firewall.py
+++ b/smoketest/scripts/cli/test_firewall.py
@@ -46,6 +46,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
def test_groups(self):
+ self.cli_set(['firewall', 'group', 'mac-group', 'smoketest_mac', 'mac-address', '00:01:02:03:04:05'])
self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24'])
self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53'])
self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123'])
@@ -53,7 +54,9 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network'])
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10'])
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port'])
- self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'accept'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'group', 'mac-group', 'smoketest_mac'])
self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest'])
@@ -61,7 +64,8 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
nftables_search = [
['iifname "eth0"', 'jump smoketest'],
- ['ip saddr { 172.16.99.0/24 }', 'ip daddr 172.16.10.10', 'tcp dport { 53, 123 }', 'return'],
+ ['ip saddr { 172.16.99.0/24 }', 'ip daddr 172.16.10.10', 'th dport { 53, 123 }', 'return'],
+ ['ether saddr { 00:01:02:03:04:05 }', 'return']
]
nftables_output = cmd('sudo nft list table ip filter')
@@ -72,7 +76,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
if all(item in line for item in search):
matched = True
break
- self.assertTrue(matched)
+ self.assertTrue(matched, msg=search)
def test_basic_rules(self):
self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop'])
@@ -80,8 +84,10 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10'])
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10'])
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject'])
- self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'protocol', 'tcp_udp'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'protocol', 'tcp'])
self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'destination', 'port', '8888'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'syn'])
+ self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'not', 'ack'])
self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest'])
@@ -90,7 +96,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
nftables_search = [
['iifname "eth0"', 'jump smoketest'],
['saddr 172.16.20.10', 'daddr 172.16.10.10', 'return'],
- ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'reject'],
+ ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'reject'],
['smoketest default-action', 'drop']
]
@@ -102,7 +108,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
if all(item in line for item in search):
matched = True
break
- self.assertTrue(matched)
+ self.assertTrue(matched, msg=search)
def test_basic_rules_ipv6(self):
self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'default-action', 'drop'])
@@ -132,7 +138,7 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
if all(item in line for item in search):
matched = True
break
- self.assertTrue(matched)
+ self.assertTrue(matched, msg=search)
def test_state_policy(self):
self.cli_set(['firewall', 'state-policy', 'established', 'action', 'accept'])
diff --git a/smoketest/scripts/cli/test_ha_vrrp.py b/smoketest/scripts/cli/test_ha_vrrp.py
index a37ce7e64..68905e447 100755
--- a/smoketest/scripts/cli/test_ha_vrrp.py
+++ b/smoketest/scripts/cli/test_ha_vrrp.py
@@ -166,5 +166,35 @@ class TestVRRP(VyOSUnitTestSHIM.TestCase):
for group in groups:
self.assertIn(f'{group}', config)
+ def test_04_exclude_vrrp_interface(self):
+ group = 'VyOS-WAN'
+ none_vrrp_interface = 'eth2'
+ vlan_id = '24'
+ vip = '100.64.24.1/24'
+ vip_dev = '192.0.2.2/24'
+ vrid = '150'
+ group_base = base_path + ['vrrp', 'group', group]
+
+ self.cli_set(['interfaces', 'ethernet', vrrp_interface, 'vif', vlan_id, 'address', '100.64.24.11/24'])
+ self.cli_set(group_base + ['interface', f'{vrrp_interface}.{vlan_id}'])
+ self.cli_set(group_base + ['address', vip])
+ self.cli_set(group_base + ['address', vip_dev, 'interface', none_vrrp_interface])
+ self.cli_set(group_base + ['track', 'exclude-vrrp-interface'])
+ self.cli_set(group_base + ['track', 'interface', none_vrrp_interface])
+ self.cli_set(group_base + ['vrid', vrid])
+
+ # commit changes
+ self.cli_commit()
+
+ config = getConfig(f'vrrp_instance {group}')
+
+ self.assertIn(f'interface {vrrp_interface}.{vlan_id}', config)
+ self.assertIn(f'virtual_router_id {vrid}', config)
+ self.assertIn(f'dont_track_primary', config)
+ self.assertIn(f' {vip}', config)
+ self.assertIn(f' {vip_dev} dev {none_vrrp_interface}', config)
+ self.assertIn(f'track_interface', config)
+ self.assertIn(f' {none_vrrp_interface}', config)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py
index 70a234187..9035f0832 100755
--- a/smoketest/scripts/cli/test_policy_route.py
+++ b/smoketest/scripts/cli/test_policy_route.py
@@ -31,8 +31,9 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
self.cli_delete(['interfaces', 'ethernet', 'eth0'])
+ self.cli_delete(['protocols', 'static'])
self.cli_delete(['policy', 'route'])
- self.cli_delete(['policy', 'ipv6-route'])
+ self.cli_delete(['policy', 'route6'])
self.cli_commit()
def test_pbr_mark(self):
@@ -62,19 +63,27 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):
self.assertTrue(matched)
def test_pbr_table(self):
- self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp'])
self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'port', '8888'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'syn'])
+ self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'not', 'ack'])
self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'table', table_id])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'protocol', 'tcp_udp'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'destination', 'port', '8888'])
+ self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'set', 'table', table_id])
self.cli_set(['interfaces', 'ethernet', 'eth0', 'policy', 'route', 'smoketest'])
+ self.cli_set(['interfaces', 'ethernet', 'eth0', 'policy', 'route6', 'smoketest6'])
self.cli_commit()
mark_hex = "{0:#010x}".format(table_mark_offset - int(table_id))
+ # IPv4
+
nftables_search = [
['iifname "eth0"', 'jump VYOS_PBR_smoketest'],
- ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'meta mark set ' + mark_hex]
+ ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'meta mark set ' + mark_hex]
]
nftables_output = cmd('sudo nft list table ip mangle')
@@ -87,6 +96,25 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):
break
self.assertTrue(matched)
+ # IPv6
+
+ nftables6_search = [
+ ['iifname "eth0"', 'jump VYOS_PBR6_smoketest'],
+ ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'meta mark set ' + mark_hex]
+ ]
+
+ nftables6_output = cmd('sudo nft list table ip6 mangle')
+
+ for search in nftables6_search:
+ matched = False
+ for line in nftables6_output.split("\n"):
+ if all(item in line for item in search):
+ matched = True
+ break
+ self.assertTrue(matched)
+
+ # IP rule fwmark -> table
+
ip_rule_search = [
['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id]
]
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py
index e8cc64463..c8cf04b7d 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_system_ntp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2020 VyOS maintainers and contributors
+# Copyright (C) 2019-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,7 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
@@ -29,17 +28,14 @@ PROCESS_NAME = 'ntpd'
NTP_CONF = '/run/ntpd/ntpd.conf'
base_path = ['system', 'ntp']
-def get_config_value(key):
- tmp = read_file(NTP_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- # remove possible trailing whitespaces
- return [item.strip() for item in tmp]
-
class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
+ super(cls, cls).setUpClass()
+
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.cli_delete(base_path)
+ cls.cli_delete(cls, base_path)
def tearDown(self):
self.cli_delete(base_path)
@@ -47,35 +43,38 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
self.assertFalse(process_named_running(PROCESS_NAME))
- def test_ntp_options(self):
+ def test_01_ntp_options(self):
# Test basic NTP support with multiple servers and their options
servers = ['192.0.2.1', '192.0.2.2']
options = ['noselect', 'preempt', 'prefer']
- ntp_pool = 'pool.vyos.io'
+ pools = ['pool.vyos.io']
for server in servers:
for option in options:
self.cli_set(base_path + ['server', server, option])
# Test NTP pool
- self.cli_set(base_path + ['server', ntp_pool, 'pool'])
+ for pool in pools:
+ self.cli_set(base_path + ['server', pool, 'pool'])
# commit changes
self.cli_commit()
# Check generated configuration
- tmp = get_config_value('server')
- for server in servers:
- test = f'{server} iburst ' + ' '.join(options)
- self.assertTrue(test in tmp)
+ config = read_file(NTP_CONF)
+ self.assertIn('driftfile /var/lib/ntp/ntp.drift', config)
+ self.assertIn('restrict default noquery nopeer notrap nomodify', config)
+ self.assertIn('restrict source nomodify notrap noquery', config)
+ self.assertIn('restrict 127.0.0.1', config)
+ self.assertIn('restrict -6 ::1', config)
- tmp = get_config_value('pool')
- self.assertTrue(f'{ntp_pool} iburst' in tmp)
+ for server in servers:
+ self.assertIn(f'server {server} iburst ' + ' '.join(options), config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ for pool in pools:
+ self.assertIn(f'pool {pool} iburst', config)
- def test_ntp_clients(self):
+ def test_02_ntp_clients(self):
# Test the allowed-networks statement
listen_address = ['127.0.0.1', '::1']
for listen in listen_address:
@@ -96,23 +95,18 @@ class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
# Check generated client address configuration
+ config = read_file(NTP_CONF)
+ self.assertIn('restrict default ignore', config)
+
for network in networks:
network_address = address_from_cidr(network)
network_netmask = netmask_from_cidr(network)
-
- tmp = get_config_value(f'restrict {network_address}')[0]
- test = f'mask {network_netmask} nomodify notrap nopeer'
- self.assertTrue(tmp in test)
+ self.assertIn(f'restrict {network_address} mask {network_netmask} nomodify notrap nopeer', config)
# Check listen address
- tmp = get_config_value('interface')
- test = ['ignore wildcard']
+ self.assertIn('interface ignore wildcard', config)
for listen in listen_address:
- test.append(f'listen {listen}')
- self.assertEqual(tmp, test)
-
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
+ self.assertIn(f'interface listen {listen}', config)
if __name__ == '__main__':
unittest.main(verbosity=2)