summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/cli/base_interfaces_test.py177
-rwxr-xr-xscripts/cli/test_interfaces_bonding.py56
-rwxr-xr-xscripts/cli/test_interfaces_bridge.py41
-rwxr-xr-xscripts/cli/test_interfaces_ethernet.py36
-rwxr-xr-xscripts/cli/test_interfaces_l2tpv3.py59
-rwxr-xr-xscripts/cli/test_interfaces_pppoe.py39
-rwxr-xr-xscripts/cli/test_interfaces_pseudo_ethernet.py23
-rwxr-xr-xscripts/cli/test_interfaces_wireless.py26
-rwxr-xr-xscripts/cli/test_nat.py63
-rwxr-xr-xscripts/cli/test_service_bcast-relay.py71
-rwxr-xr-xscripts/cli/test_service_mdns-repeater.py51
-rwxr-xr-xscripts/cli/test_service_pppoe-server.py154
-rwxr-xr-xscripts/cli/test_service_router-advert.py99
-rwxr-xr-xscripts/cli/test_service_snmp.py57
-rwxr-xr-xscripts/cli/test_service_ssh.py4
-rwxr-xr-xscripts/cli/test_system_ntp.py108
-rwxr-xr-xscripts/cli/test_vpn_anyconnect.py58
17 files changed, 1010 insertions, 112 deletions
diff --git a/scripts/cli/base_interfaces_test.py b/scripts/cli/base_interfaces_test.py
index 53fe553bc..14ec7e137 100644
--- a/scripts/cli/base_interfaces_test.py
+++ b/scripts/cli/base_interfaces_test.py
@@ -15,17 +15,28 @@
import os
import unittest
-from vyos.configsession import ConfigSession
from netifaces import ifaddresses, AF_INET, AF_INET6
-from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local
+
+from vyos.configsession import ConfigSession
from vyos.ifconfig import Interface
+from vyos.util import read_file
+from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local
class BasicInterfaceTest:
class BaseTest(unittest.TestCase):
+ _test_ip = False
_test_mtu = False
+ _test_vlan = False
+ _test_qinq = False
+ _test_ipv6 = False
_base_path = []
+
_options = {}
_interfaces = []
+ _qinq_range = ['10', '20', '30']
+ _vlan_range = ['100', '200', '300', '2000']
+ # choose IPv6 minimum MTU value for tests - this must always work
+ _mtu = '1280'
def setUp(self):
self.session = ConfigSession(os.getpid())
@@ -38,9 +49,14 @@ class BasicInterfaceTest:
def tearDown(self):
# we should not remove ethernet from the overall CLI
if 'ethernet' in self._base_path:
- self.session.delete(self._base_path)
- for intf in self._interfaces:
- self.session.set(self._base_path + [intf])
+ for interface in self._interfaces:
+ # when using a dedicated interface to test via TEST_ETH environment
+ # variable only this one will be cleared in the end - usable to test
+ # ethernet interfaces via SSH
+ self.session.delete(self._base_path + [interface])
+ self.session.set(self._base_path + [interface, 'duplex', 'auto'])
+ self.session.set(self._base_path + [interface, 'speed', 'auto'])
+ self.session.set(self._base_path + [interface, 'smp-affinity', 'auto'])
else:
self.session.delete(self._base_path)
@@ -105,26 +121,149 @@ class BasicInterfaceTest:
self.assertTrue(is_intf_addr_assigned(intf, addr['addr']))
+ def test_ipv6_link_local(self):
+ """ Common function for IPv6 link-local address assignemnts """
+ if not self._test_ipv6:
+ return None
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(base + option.split())
+
+ # after commit we must have an IPv6 link-local address
+ self.session.commit()
+
+ for interface in self._interfaces:
+ for addr in ifaddresses(interface)[AF_INET6]:
+ self.assertTrue(is_ipv6_link_local(addr['addr']))
+
+ # disable IPv6 link-local address assignment
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+
+ # after commit we must have no IPv6 link-local address
+ self.session.commit()
+
+ for interface in self._interfaces:
+ self.assertTrue(AF_INET6 not in ifaddresses(interface))
+
+ def _mtu_test(self, intf):
+ """ helper function to verify MTU size """
+ with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f:
+ tmp = f.read().rstrip()
+ self.assertEqual(tmp, self._mtu)
def test_change_mtu(self):
- """
- Check if MTU can be changed on interface.
- Test MTU size will be 1400 bytes.
- """
- if self._test_mtu is False:
+ """ Testcase if MTU can be changed on interface """
+ if not self._test_mtu:
return None
-
- # choose a MTU which works on every interface
- mtu = '1400'
for intf in self._interfaces:
- self.session.set(self._base_path + [intf, 'mtu', mtu])
+ base = self._base_path + [intf]
+ self.session.set(base + ['mtu', self._mtu])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.session.set(base + option.split())
self.session.commit()
+ for intf in self._interfaces:
+ self._mtu_test(intf)
- # Validate interface description
+ def test_8021q_vlan(self):
+ """ Testcase for 802.1q VLAN interfaces """
+ if not self._test_vlan:
+ return None
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(base + option.split())
+
+ for vlan in self._vlan_range:
+ base = self._base_path + [interface, 'vif', vlan]
+ self.session.set(base + ['mtu', self._mtu])
+ for address in self._test_addr:
+ self.session.set(base + ['address', address])
+
+ self.session.commit()
for intf in self._interfaces:
- with open('/sys/class/net/{}/mtu'.format(intf), 'r') as f:
- tmp = f.read().rstrip()
- self.assertTrue(tmp == mtu)
+ for vlan in self._vlan_range:
+ vif = f'{intf}.{vlan}'
+ for address in self._test_addr:
+ self.assertTrue(is_intf_addr_assigned(vif, address))
+ self._mtu_test(vif)
+
+
+ def test_8021ad_qinq_vlan(self):
+ """ Testcase for 802.1ad Q-in-Q VLAN interfaces """
+ if not self._test_qinq:
+ return None
+
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(base + option.split())
+
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
+ self.session.set(base + ['mtu', self._mtu])
+ for address in self._test_addr:
+ self.session.set(base + ['address', address])
+
+ self.session.commit()
+ for interface in self._interfaces:
+ for vif_s in self._qinq_range:
+ for vif_c in self._vlan_range:
+ vif = f'{interface}.{vif_s}.{vif_c}'
+ for address in self._test_addr:
+ self.assertTrue(is_intf_addr_assigned(vif, address))
+ self._mtu_test(vif)
+
+ def test_ip_options(self):
+ """ test IP options like arp """
+ if not self._test_ip:
+ return None
+
+ for interface in self._interfaces:
+ arp_tmo = '300'
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(path + option.split())
+
+ # Options
+ self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
+ self.session.set(path + ['ip', 'disable-arp-filter'])
+ self.session.set(path + ['ip', 'enable-arp-accept'])
+ self.session.set(path + ['ip', 'enable-arp-announce'])
+ self.session.set(path + ['ip', 'enable-arp-ignore'])
+ self.session.set(path + ['ip', 'enable-proxy-arp'])
+ self.session.set(path + ['ip', 'proxy-arp-pvlan'])
+ self.session.set(path + ['ip', 'source-validation', 'loose'])
+
+ self.session.commit()
+
+ for interface in self._interfaces:
+ tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
+ self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter')
+ self.assertEqual('0', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept')
+ self.assertEqual('1', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce')
+ self.assertEqual('1', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore')
+ self.assertEqual('1', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp')
+ self.assertEqual('1', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan')
+ self.assertEqual('1', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter')
+ self.assertEqual('2', tmp)
diff --git a/scripts/cli/test_interfaces_bonding.py b/scripts/cli/test_interfaces_bonding.py
index bfadc4a9d..e3d3b25ee 100755
--- a/scripts/cli/test_interfaces_bonding.py
+++ b/scripts/cli/test_interfaces_bonding.py
@@ -14,46 +14,48 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import unittest
from base_interfaces_test import BasicInterfaceTest
from vyos.ifconfig import Section
from vyos.configsession import ConfigSessionError
+from vyos.util import read_file
class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
- super().setUp()
+ super().setUp()
- self._base_path = ['interfaces', 'bonding']
- self._test_mtu = True
- self._interfaces = ['bond0']
+ self._base_path = ['interfaces', 'bonding']
+ self._interfaces = ['bond0']
+ self._test_mtu = True
+ self._test_vlan = True
+ self._test_qinq = True
+ self._test_ipv6 = True
- def test_add_remove_member(self):
- members = []
+ self._members = []
# we need to filter out VLAN interfaces identified by a dot (.)
# in their name - just in case!
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- members.append(tmp)
-
- for intf in self._interfaces:
- for member in members:
- # We can not enslave an interface when there is an address
- # assigned - take care here - or find them dynamically if a user
- # runs vyos-smoketest on his production device?
- self.session.set(self._base_path + [intf, 'member', 'interface', member])
-
- self.session.commit()
-
- # check validate() - we can only add existing interfaces
- self.session.set(self._base_path + [intf, 'member', 'interface', 'eth99'])
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- # check if member deletion works as expected
- self.session.delete(self._base_path + [intf, 'member'])
- self.session.commit()
+ if 'TEST_ETH' in os.environ:
+ self._members = os.environ['TEST_ETH'].split()
+ else:
+ for tmp in Section.interfaces("ethernet"):
+ if not '.' in tmp:
+ self._members.append(tmp)
+
+ self._options['bond0'] = []
+ for member in self._members:
+ self._options['bond0'].append(f'member interface {member}')
+
+
+ def test_add_address_single(self):
+ """ derived method to check if member interfaces are enslaved properly """
+ super().test_add_address_single()
+
+ for interface in self._interfaces:
+ slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split()
+ self.assertListEqual(slaves, self._members)
if __name__ == '__main__':
unittest.main()
diff --git a/scripts/cli/test_interfaces_bridge.py b/scripts/cli/test_interfaces_bridge.py
index 03c78c210..bc0bb69c6 100755
--- a/scripts/cli/test_interfaces_bridge.py
+++ b/scripts/cli/test_interfaces_bridge.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import unittest
from base_interfaces_test import BasicInterfaceTest
@@ -23,35 +24,45 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
super().setUp()
+ self._test_ipv6 = True
+
self._base_path = ['interfaces', 'bridge']
self._interfaces = ['br0']
- def test_add_remove_member(self):
- members = []
+ self._members = []
# we need to filter out VLAN interfaces identified by a dot (.)
# in their name - just in case!
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- members.append(tmp)
+ if 'TEST_ETH' in os.environ:
+ self._members = os.environ['TEST_ETH'].split()
+ else:
+ for tmp in Section.interfaces("ethernet"):
+ if not '.' in tmp:
+ self._members.append(tmp)
- for intf in self._interfaces:
- cost = 1000
- priority = 10
+ self._options['br0'] = []
+ for member in self._members:
+ self._options['br0'].append(f'member interface {member}')
- self.session.set(self._base_path + [intf, 'stp'])
+ def test_add_remove_member(self):
+ for interface in self._interfaces:
+ base = self._base_path + [interface]
+ self.session.set(base + ['stp'])
+ self.session.set(base + ['address', '192.0.2.1/24'])
+ cost = 1000
+ priority = 10
# assign members to bridge interface
- for member in members:
- self.session.set(self._base_path + [intf, 'member', 'interface', member])
- self.session.set(self._base_path + [intf, 'member', 'interface', member, 'cost', str(cost)])
- self.session.set(self._base_path + [intf, 'member', 'interface', member, 'priority', str(priority)])
+ for member in self._members:
+ base_member = base + ['member', 'interface', member]
+ self.session.set(base_member + ['cost', str(cost)])
+ self.session.set(base_member + ['priority', str(priority)])
cost += 1
priority += 1
self.session.commit()
- for intf in self._interfaces:
- self.session.delete(self._base_path + [intf, 'member'])
+ for interface in self._interfaces:
+ self.session.delete(self._base_path + [interface, 'member'])
self.session.commit()
diff --git a/scripts/cli/test_interfaces_ethernet.py b/scripts/cli/test_interfaces_ethernet.py
index 373c81680..761ec7506 100755
--- a/scripts/cli/test_interfaces_ethernet.py
+++ b/scripts/cli/test_interfaces_ethernet.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import unittest
from base_interfaces_test import BasicInterfaceTest
@@ -24,15 +25,44 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
super().setUp()
self._base_path = ['interfaces', 'ethernet']
+ self._test_ip = True
self._test_mtu = True
+ self._test_vlan = True
+ self._test_qinq = True
+ self._test_ipv6 = True
self._interfaces = []
# we need to filter out VLAN interfaces identified by a dot (.)
# in their name - just in case!
- for tmp in Section.interfaces("ethernet"):
- if not '.' in tmp:
- self._interfaces.append(tmp)
+ if 'TEST_ETH' in os.environ:
+ tmp = os.environ['TEST_ETH'].split()
+ self._interfaces = tmp
+ else:
+ for tmp in Section.interfaces("ethernet"):
+ if not '.' in tmp:
+ self._interfaces.append(tmp)
+ def test_dhcp_disable(self):
+ """
+ When interface is configured as admin down, it must be admin down even
+ """
+ for interface in self._interfaces:
+ self.session.set(self._base_path + [interface, 'disable'])
+ for option in self._options.get(interface, []):
+ self.session.set(self._base_path + [interface] + option.split())
+
+ # Also enable DHCP (ISC DHCP always places interface in admin up
+ # state so we check that we do not start DHCP client.
+ # https://phabricator.vyos.net/T2767
+ self.session.set(self._base_path + [interface, 'address', 'dhcp'])
+
+ self.session.commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ with open(f'/sys/class/net/{interface}/flags', 'r') as f:
+ flags = f.read()
+ self.assertEqual(int(flags, 16) & 1, 0)
if __name__ == '__main__':
unittest.main()
diff --git a/scripts/cli/test_interfaces_l2tpv3.py b/scripts/cli/test_interfaces_l2tpv3.py
new file mode 100755
index 000000000..d8655d157
--- /dev/null
+++ b/scripts/cli/test_interfaces_l2tpv3.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import jmespath
+import unittest
+
+from base_interfaces_test import BasicInterfaceTest
+from vyos.util import cmd
+
+class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
+ def setUp(self):
+ super().setUp()
+
+ self._base_path = ['interfaces', 'l2tpv3']
+ self._options = {
+ 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10',
+ 'tunnel-id 100', 'peer-tunnel-id 10',
+ 'session-id 100', 'peer-session-id 10',
+ 'source-port 1010', 'destination-port 10101'],
+ 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20',
+ 'peer-tunnel-id 200', 'remote-ip 127.20.20.20',
+ 'session-id 20', 'tunnel-id 200',
+ 'source-port 2020', 'destination-port 20202'],
+ }
+ self._interfaces = list(self._options)
+
+ def test_add_address_single(self):
+ super().test_add_address_single()
+
+ command = 'sudo ip -j l2tp show session'
+ json_out = json.loads(cmd(command))
+ for interface in self._options:
+ for config in json_out:
+ if config['interface'] == interface:
+ # convert list with configuration items into a dict
+ dict = {}
+ for opt in self._options[interface]:
+ dict.update({opt.split()[0].replace('-','_'): opt.split()[1]})
+
+ for key in ['peer_session_id', 'peer_tunnel_id', 'session_id', 'tunnel_id']:
+ self.assertEqual(str(config[key]), dict[key])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_interfaces_pppoe.py b/scripts/cli/test_interfaces_pppoe.py
index 3c11da795..822f05de6 100755
--- a/scripts/cli/test_interfaces_pppoe.py
+++ b/scripts/cli/test_interfaces_pppoe.py
@@ -53,7 +53,7 @@ class PPPoEInterfaceTest(unittest.TestCase):
self.session.commit()
del self.session
- def test_pppoe_1(self):
+ def test_pppoe(self):
""" Check if PPPoE dialer can be configured and runs """
for interface in self._interfaces:
user = 'VyOS-user-' + interface
@@ -80,13 +80,13 @@ class PPPoEInterfaceTest(unittest.TestCase):
password = 'VyOS-passwd-' + interface
tmp = get_config_value(interface, 'mtu')[1]
- self.assertTrue(tmp in mtu)
+ self.assertEqual(tmp, mtu)
tmp = get_config_value(interface, 'user')[1].replace('"', '')
- self.assertTrue(tmp in user)
+ self.assertEqual(tmp, user)
tmp = get_config_value(interface, 'password')[1].replace('"', '')
- self.assertTrue(tmp in password)
+ self.assertEqual(tmp, password)
tmp = get_config_value(interface, 'ifname')[1]
- self.assertTrue(tmp in interface)
+ self.assertEqual(tmp, interface)
# Check if ppp process is running in the interface in question
running = False
@@ -98,7 +98,7 @@ class PPPoEInterfaceTest(unittest.TestCase):
self.assertTrue(running)
def test_pppoe_dhcpv6pd(self):
- """ Check if PPPoE dialer can be configured and runs """
+ """ Check if PPPoE dialer can be configured with DHCPv6-PD """
address = '1'
sla_id = '0'
sla_len = '8'
@@ -111,39 +111,38 @@ class PPPoEInterfaceTest(unittest.TestCase):
self.session.set(base_path + [interface, 'ipv6', 'enable'])
# prefix delegation stuff
- dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'prefix-delegation']
+ dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0']
self.session.set(dhcpv6_pd_base + ['length', '56'])
self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address])
self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id])
- self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-len', sla_len])
# commit changes
self.session.commit()
# verify "normal" PPPoE value - 1492 is default MTU
tmp = get_config_value(interface, 'mtu')[1]
- self.assertTrue(tmp in '1492')
+ self.assertEqual(tmp, '1492')
tmp = get_config_value(interface, 'user')[1].replace('"', '')
- self.assertTrue(tmp in 'vyos')
+ self.assertEqual(tmp, 'vyos')
tmp = get_config_value(interface, 'password')[1].replace('"', '')
- self.assertTrue(tmp in 'vyos')
+ self.assertEqual(tmp, 'vyos')
for param in ['+ipv6', 'ipv6cp-use-ipaddr']:
tmp = get_config_value(interface, param)[0]
- self.assertTrue(tmp in param)
+ self.assertEqual(tmp, param)
# verify DHCPv6 prefix delegation
# will return: ['delegation', '::/56 infinity;']
tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
- self.assertTrue(tmp in '::/56')
+ self.assertEqual(tmp, '::/56')
tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
- self.assertTrue(tmp in self._source_interface)
+ self.assertEqual(tmp, self._source_interface)
tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
- self.assertTrue(tmp in address)
+ self.assertEqual(tmp, address)
tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
- self.assertTrue(tmp in sla_id)
+ self.assertEqual(tmp, sla_id)
tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
- self.assertTrue(tmp in sla_len)
+ self.assertEqual(tmp, sla_len)
# Check if ppp process is running in the interface in question
running = False
@@ -153,7 +152,11 @@ class PPPoEInterfaceTest(unittest.TestCase):
self.assertTrue(running)
# We can not check if wide-dhcpv6 process is running as it is started
- # after the PPP interface gets a link to the ISP
+ # after the PPP interface gets a link to the ISP - but we can see if
+ # it would be started by the scripts
+ tmp = read_file(f'/etc/ppp/ipv6-up.d/1000-vyos-pppoe-{interface}')
+ tmp = re.findall(f'systemctl start dhcp6c@{interface}.service', tmp)
+ self.assertTrue(tmp)
if __name__ == '__main__':
unittest.main()
diff --git a/scripts/cli/test_interfaces_pseudo_ethernet.py b/scripts/cli/test_interfaces_pseudo_ethernet.py
index 1f5de4f61..bc2e6e7eb 100755
--- a/scripts/cli/test_interfaces_pseudo_ethernet.py
+++ b/scripts/cli/test_interfaces_pseudo_ethernet.py
@@ -21,18 +21,19 @@ from base_interfaces_test import BasicInterfaceTest
class PEthInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
- super().setUp()
- self._base_path = ['interfaces', 'pseudo-ethernet']
- options = ['source-interface eth0', 'ip arp-cache-timeout 10',
- 'ip disable-arp-filter', 'ip enable-arp-accept',
- 'ip enable-arp-announce', 'ip enable-arp-ignore',
- 'ip enable-proxy-arp', 'ip proxy-arp-pvlan']
+ super().setUp()
+ self._base_path = ['interfaces', 'pseudo-ethernet']
- self._options = {
- 'peth0': options,
- 'peth1': options,
- }
- self._interfaces = list(self._options)
+ self._test_ip = True
+ self._test_mtu = True
+ self._test_vlan = True
+ self._test_qinq = True
+
+ self._options = {
+ 'peth0': ['source-interface eth1'],
+ 'peth1': ['source-interface eth1'],
+ }
+ self._interfaces = list(self._options)
if __name__ == '__main__':
unittest.main()
diff --git a/scripts/cli/test_interfaces_wireless.py b/scripts/cli/test_interfaces_wireless.py
index 5d04a1c44..fae233244 100755
--- a/scripts/cli/test_interfaces_wireless.py
+++ b/scripts/cli/test_interfaces_wireless.py
@@ -18,6 +18,8 @@ import os
import unittest
from base_interfaces_test import BasicInterfaceTest
+from psutil import process_iter
+from vyos.util import check_kmod
class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
@@ -37,18 +39,20 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
self._interfaces = list(self._options)
self.session.set(['system', 'wifi-regulatory-domain', 'SE'])
- def test_wifi_client(self):
- """ test creation of a wireless station """
- for intf in self._interfaces:
- # prepare interfaces
- for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
-
- # commit changes
- self.session.commit()
-
+ def test_add_address_single(self):
+ """ derived method to check if member interfaces are enslaved properly """
+ super().test_add_address_single()
+ for option, option_value in self._options.items():
+ if 'type access-point' in option_value:
+ # Check for running process
+ self.assertIn('hostapd', (p.name() for p in process_iter()))
+ elif 'type station' in option_value:
+ # Check for running process
+ self.assertIn('wpa_supplicant', (p.name() for p in process_iter()))
+ else:
+ self.assertTrue(False)
if __name__ == '__main__':
- os.system('sudo modprobe mac80211_hwsim')
+ check_kmod('mac80211_hwsim')
unittest.main()
diff --git a/scripts/cli/test_nat.py b/scripts/cli/test_nat.py
new file mode 100755
index 000000000..416810e40
--- /dev/null
+++ b/scripts/cli/test_nat.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import jmespath
+import json
+import unittest
+
+from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.util import cmd
+
+base_path = ['nat']
+snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}'
+
+class TestNAT(unittest.TestCase):
+ def setUp(self):
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ self.session = ConfigSession(os.getpid())
+ self.session.delete(base_path)
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.commit()
+
+ def test_source_nat(self):
+ """ Configure and validate source NAT rule(s) """
+
+ path = base_path + ['source']
+ network = '192.168.0.0/16'
+ self.session.set(path + ['rule', '1', 'destination', 'address', network])
+ self.session.set(path + ['rule', '1', 'exclude'])
+
+ # check validate() - outbound-interface must be defined
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ self.session.set(path + ['rule', '1', 'outbound-interface', 'any'])
+ self.session.commit()
+
+ tmp = cmd('sudo nft -j list table nat')
+ nftable_json = json.loads(tmp)
+ condensed_json = jmespath.search(snat_pattern, nftable_json)[0]
+
+ self.assertEqual(condensed_json['comment'], 'DST-NAT-1')
+ self.assertEqual(condensed_json['address']['network'], network.split('/')[0])
+ self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1])
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_service_bcast-relay.py b/scripts/cli/test_service_bcast-relay.py
new file mode 100755
index 000000000..fe4531c3b
--- /dev/null
+++ b/scripts/cli/test_service_bcast-relay.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from psutil import process_iter
+from vyos.configsession import ConfigSession, ConfigSessionError
+
+base_path = ['service', 'broadcast-relay']
+
+class TestServiceBroadcastRelay(unittest.TestCase):
+ _address1 = '192.0.2.1/24'
+ _address2 = '192.0.2.1/24'
+
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+ self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1])
+ self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2])
+ self.session.commit()
+
+ def tearDown(self):
+ self.session.delete(['interfaces', 'dummy', 'dum1001'])
+ self.session.delete(['interfaces', 'dummy', 'dum1002'])
+ self.session.delete(base_path)
+ self.session.commit()
+ del self.session
+
+ def test_service(self):
+ """ Check if broadcast relay service can be configured and runs """
+ ids = range(1, 5)
+ for id in ids:
+ base = base_path + ['id', str(id)]
+ self.session.set(base + ['description', 'vyos'])
+ self.session.set(base + ['port', str(10000 + id)])
+
+ # check validate() - two interfaces must be present
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ self.session.set(base + ['interface', 'dum1001'])
+ self.session.set(base + ['interface', 'dum1002'])
+ self.session.set(base + ['address', self._address1.split('/')[0]])
+
+ self.session.commit()
+
+ for id in ids:
+ # check if process is running
+ running = False
+ for p in process_iter():
+ if "udp-broadcast-relay" in p.name():
+ if p.cmdline()[3] == str(id):
+ running = True
+ break
+ self.assertTrue(running)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_service_mdns-repeater.py b/scripts/cli/test_service_mdns-repeater.py
new file mode 100755
index 000000000..18900b6d2
--- /dev/null
+++ b/scripts/cli/test_service_mdns-repeater.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from psutil import process_iter
+from vyos.configsession import ConfigSession
+
+base_path = ['service', 'mdns', 'repeater']
+intf_base = ['interfaces', 'dummy']
+
+class TestServiceMDNSrepeater(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.delete(intf_base + ['dum10'])
+ self.session.delete(intf_base + ['dum20'])
+ self.session.commit()
+ del self.session
+
+ def test_service(self):
+ # Service required a configured IP address on the interface
+
+ self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30'])
+ self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30'])
+
+ self.session.set(base_path + ['interface', 'dum10'])
+ self.session.set(base_path + ['interface', 'dum20'])
+ self.session.commit()
+
+ # Check for running process
+ self.assertTrue("mdns-repeater" in (p.name() for p in process_iter()))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_service_pppoe-server.py b/scripts/cli/test_service_pppoe-server.py
new file mode 100755
index 000000000..318b6530a
--- /dev/null
+++ b/scripts/cli/test_service_pppoe-server.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from configparser import ConfigParser
+from psutil import process_iter
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+
+base_path = ['service', 'pppoe-server']
+local_if = ['interfaces', 'dummy', 'dum667']
+pppoe_conf = '/run/accel-pppd/pppoe.conf'
+
+ac_name = 'ACN'
+subnet = '172.18.0.0/24'
+gateway = '192.0.2.1'
+nameserver = '9.9.9.9'
+mtu = '1492'
+interface = 'eth0'
+
+class TestServicePPPoEServer(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ self.session.delete(base_path)
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.delete(local_if)
+ self.session.commit()
+ del self.session
+
+ def verify(self, conf):
+ # validate some common values in the configuration
+ for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', 'ipv6pool',
+ 'ipv6_nd', 'ipv6_dhcp', 'auth_mschap_v2', 'auth_mschap_v1',
+ 'auth_chap_md5', 'auth_pap', 'shaper']:
+ # Settings without values provide None
+ self.assertEqual(conf['modules'][tmp], None)
+
+ # check Access Concentrator setting
+ self.assertTrue(conf['pppoe']['ac-name'] == ac_name)
+ self.assertTrue(conf['pppoe'].getboolean('verbose'))
+ self.assertTrue(conf['pppoe']['interface'], interface)
+
+ # check configured subnet
+ self.assertEqual(conf['ip-pool'][subnet], None)
+ self.assertEqual(conf['ip-pool']['gw-ip-address'], gateway)
+
+ # check ppp
+ self.assertTrue(conf['ppp'].getboolean('verbose'))
+ self.assertTrue(conf['ppp'].getboolean('check-ip'))
+ self.assertFalse(conf['ppp'].getboolean('ccp'))
+ self.assertEqual(conf['ppp']['min-mtu'], mtu)
+ self.assertEqual(conf['ppp']['mtu'], mtu)
+ self.assertEqual(conf['ppp']['mppe'], 'prefer')
+ self.assertEqual(conf['ppp']['lcp-echo-interval'], '30')
+ self.assertEqual(conf['ppp']['lcp-echo-timeout'], '0')
+ self.assertEqual(conf['ppp']['lcp-echo-failure'], '3')
+
+ def test_local_auth(self):
+ """ Test configuration of local authentication for PPPoE server """
+ self.session.set(local_if + ['address', '192.0.2.1/32'])
+ self.session.set(base_path + ['access-concentrator', ac_name])
+ self.session.set(base_path + ['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos'])
+ self.session.set(base_path + ['authentication', 'mode', 'local'])
+ self.session.set(base_path + ['client-ip-pool', 'subnet', subnet])
+ self.session.set(base_path + ['name-server', nameserver])
+ self.session.set(base_path + ['interface', interface])
+ self.session.set(base_path + ['local-ip', gateway])
+
+ # commit changes
+ self.session.commit()
+
+ # Validate configuration values
+ conf = ConfigParser(allow_no_value=True)
+ conf.read(pppoe_conf)
+
+ # basic verification
+ self.verify(conf)
+
+ # check auth
+ self.assertEqual(conf['chap-secrets']['chap-secrets'], '/run/accel-pppd/pppoe.chap-secrets')
+ self.assertEqual(conf['chap-secrets']['gw-ip-address'], gateway)
+
+ # Check for running process
+ self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
+
+ def test_radius_auth(self):
+ """ Test configuration of RADIUS authentication for PPPoE server """
+ radius_server = '192.0.2.22'
+ radius_key = 'secretVyOS'
+ radius_port = '2000'
+ radius_port_acc = '3000'
+
+ self.session.set(local_if + ['address', '192.0.2.1/32'])
+ self.session.set(base_path + ['access-concentrator', ac_name])
+ self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'key', radius_key])
+ self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'port', radius_port])
+ self.session.set(base_path + ['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc])
+
+ self.session.set(base_path + ['authentication', 'mode', 'radius'])
+ self.session.set(base_path + ['client-ip-pool', 'subnet', subnet])
+ self.session.set(base_path + ['name-server', nameserver])
+ self.session.set(base_path + ['interface', interface])
+ self.session.set(base_path + ['local-ip', gateway])
+
+ # commit changes
+ self.session.commit()
+
+ # Validate configuration values
+ conf = ConfigParser(allow_no_value=True)
+ conf.read(pppoe_conf)
+
+ # basic verification
+ self.verify(conf)
+
+ # check auth
+ self.assertTrue(conf['radius'].getboolean('verbose'))
+ self.assertTrue(conf['radius']['acct-timeout'], '3')
+ self.assertTrue(conf['radius']['timeout'], '3')
+ self.assertTrue(conf['radius']['max-try'], '3')
+ self.assertTrue(conf['radius']['gw-ip-address'], gateway)
+
+ server = conf['radius']['server'].split(',')
+ self.assertEqual(radius_server, server[0])
+ self.assertEqual(radius_key, server[1])
+ self.assertEqual(f'auth-port={radius_port}', server[2])
+ self.assertEqual(f'acct-port={radius_port_acc}', server[3])
+ self.assertEqual(f'req-limit=0', server[4])
+ self.assertEqual(f'fail-time=0', server[5])
+
+ # Check for running process
+ self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_service_router-advert.py b/scripts/cli/test_service_router-advert.py
new file mode 100755
index 000000000..ec2110c8a
--- /dev/null
+++ b/scripts/cli/test_service_router-advert.py
@@ -0,0 +1,99 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from psutil import process_iter
+from vyos.configsession import ConfigSession
+from vyos.util import read_file
+
+RADVD_CONF = '/run/radvd/radvd.conf'
+
+interface = 'eth1'
+base_path = ['service', 'router-advert', 'interface', interface]
+address_base = ['interfaces', 'ethernet', interface, 'address']
+
+def get_config_value(key):
+ tmp = read_file(RADVD_CONF)
+ tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
+ return tmp[0].split()[0].replace(';','')
+
+class TestServiceRADVD(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+ self.session.set(address_base + ['2001:db8::1/64'])
+
+ def tearDown(self):
+ self.session.delete(address_base)
+ self.session.delete(base_path)
+ self.session.commit()
+ del self.session
+
+ def test_single(self):
+ self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag'])
+ self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag'])
+ self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity'])
+ self.session.set(base_path + ['dnssl', '2001:db8::1234'])
+ self.session.set(base_path + ['other-config-flag'])
+
+ # commit changes
+ self.session.commit()
+
+ # verify values
+ tmp = get_config_value('interface')
+ self.assertEqual(tmp, interface)
+
+ tmp = get_config_value('prefix')
+ self.assertEqual(tmp, '::/64')
+
+ tmp = get_config_value('AdvOtherConfigFlag')
+ self.assertEqual(tmp, 'on')
+
+ # this is a default value
+ tmp = get_config_value('AdvRetransTimer')
+ self.assertEqual(tmp, '0')
+
+ # this is a default value
+ tmp = get_config_value('AdvCurHopLimit')
+ self.assertEqual(tmp, '64')
+
+ # this is a default value
+ tmp = get_config_value('AdvDefaultPreference')
+ self.assertEqual(tmp, 'medium')
+
+ tmp = get_config_value('AdvAutonomous')
+ self.assertEqual(tmp, 'off')
+
+ # this is a default value
+ tmp = get_config_value('AdvValidLifetime')
+ self.assertEqual(tmp, 'infinity')
+
+ # this is a default value
+ tmp = get_config_value('AdvPreferredLifetime')
+ self.assertEqual(tmp, '14400')
+
+ tmp = get_config_value('AdvOnLink')
+ self.assertEqual(tmp, 'off')
+
+
+
+ # Check for running process
+ self.assertTrue('radvd' in (p.name() for p in process_iter()))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_service_snmp.py b/scripts/cli/test_service_snmp.py
index d4bbdc0b1..fb5f5393f 100755
--- a/scripts/cli/test_service_snmp.py
+++ b/scripts/cli/test_service_snmp.py
@@ -81,10 +81,10 @@ class TestSNMPService(unittest.TestCase):
self.assertTrue("snmpd" in (p.name() for p in process_iter()))
- def test_snmpv3(self):
- """ Check if SNMPv3 can be configured and service runs"""
+ def test_snmpv3_sha(self):
+ """ Check if SNMPv3 can be configured with SHA authentication and service runs"""
- self.session.set(base_path + ['v3', 'engineid', '0xaffedeadbeef'])
+ self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
# check validate() - a view must be created before this can be comitted
with self.assertRaises(ConfigSessionError):
@@ -92,19 +92,64 @@ class TestSNMPService(unittest.TestCase):
self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
- self.session.commit()
# create user
- for authpriv in ['auth', 'privacy']:
- self.session.set(base_path + ['v3', 'user', 'vyos', authpriv, 'plaintext-key', 'vyos1234'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
+
+ self.session.commit()
+
+ # commit will alter the CLI values - check if they have been updated:
+ hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe'
+ tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
+ self.assertEqual(tmp, hashed_password)
+
+ tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
+ self.assertEqual(tmp, hashed_password)
+
+ # TODO: read in config file and check values
+
+ # Check for running process
+ self.assertTrue("snmpd" in (p.name() for p in process_iter()))
+
+ def test_snmpv3_md5(self):
+ """ Check if SNMPv3 can be configured with MD5 authentication and service runs"""
+
+ self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
+ self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
+ # check validate() - a view must be created before this can be comitted
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
+ self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
+ # create user
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
+ self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des'])
self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
+ self.session.commit()
+
+ # commit will alter the CLI values - check if they have been updated:
+ hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad'
+ tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
+ self.assertEqual(tmp, hashed_password)
+
+ tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
+ self.assertEqual(tmp, hashed_password)
+
# TODO: read in config file and check values
# Check for running process
self.assertTrue("snmpd" in (p.name() for p in process_iter()))
+
if __name__ == '__main__':
unittest.main()
diff --git a/scripts/cli/test_service_ssh.py b/scripts/cli/test_service_ssh.py
index 072984ca8..3ee498f3d 100755
--- a/scripts/cli/test_service_ssh.py
+++ b/scripts/cli/test_service_ssh.py
@@ -22,7 +22,7 @@ from psutil import process_iter
from vyos.configsession import ConfigSession, ConfigSessionError
from vyos.util import read_file
-SSHD_CONF = '/etc/ssh/sshd_config'
+SSHD_CONF = '/run/ssh/sshd_config'
base_path = ['service', 'ssh']
def get_config_value(key):
@@ -51,7 +51,7 @@ class TestServiceSSH(unittest.TestCase):
self.session.set(base_path + ['port', '1234'])
self.session.set(base_path + ['disable-host-validation'])
self.session.set(base_path + ['disable-password-authentication'])
- self.session.set(base_path + ['loglevel', 'VERBOSE'])
+ self.session.set(base_path + ['loglevel', 'verbose'])
self.session.set(base_path + ['client-keepalive-interval', '100'])
self.session.set(base_path + ['listen-address', '127.0.0.1'])
diff --git a/scripts/cli/test_system_ntp.py b/scripts/cli/test_system_ntp.py
new file mode 100755
index 000000000..856a28916
--- /dev/null
+++ b/scripts/cli/test_system_ntp.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from psutil import process_iter
+from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr
+from vyos.util import read_file
+
+NTP_CONF = '/etc/ntp.conf'
+base_path = ['system', 'ntp']
+
+def get_config_value(key):
+ tmp = read_file(NTP_CONF)
+ tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
+ # remove possible trailing whitespaces
+ return [item.strip() for item in tmp]
+
+class TestSystemNTP(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ self.session.delete(base_path)
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.commit()
+ del self.session
+
+ def test_ntp_options(self):
+ """ Test basic NTP support with multiple servers and their options """
+ servers = ['192.0.2.1', '192.0.2.2']
+ options = ['noselect', 'preempt', 'prefer']
+
+ for server in servers:
+ for option in options:
+ self.session.set(base_path + ['server', server, option])
+
+ # commit changes
+ self.session.commit()
+
+ # Check generated configuration
+ tmp = get_config_value('server')
+ for server in servers:
+ test = f'{server} iburst ' + ' '.join(options)
+ self.assertTrue(test in tmp)
+
+ # Check for running process
+ self.assertTrue("ntpd" in (p.name() for p in process_iter()))
+
+ def test_ntp_clients(self):
+ """ Test the allowed-networks statement """
+ listen_address = ['127.0.0.1', '::1']
+ for listen in listen_address:
+ self.session.set(base_path + ['listen-address', listen])
+
+ networks = ['192.0.2.0/24', '2001:db8:1000::/64']
+ for network in networks:
+ self.session.set(base_path + ['allow-clients', 'address', network])
+
+ # Verify "NTP server not configured" verify() statement
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ servers = ['192.0.2.1', '192.0.2.2']
+ for server in servers:
+ self.session.set(base_path + ['server', server])
+
+ self.session.commit()
+
+ # Check generated client address configuration
+ for network in networks:
+ network_address = vyos_address_from_cidr(network)
+ network_netmask = vyos_netmask_from_cidr(network)
+
+ tmp = get_config_value(f'restrict {network_address}')[0]
+ test = f'mask {network_netmask} nomodify notrap nopeer'
+ self.assertTrue(tmp in test)
+
+ # Check listen address
+ tmp = get_config_value('interface')
+ test = ['ignore wildcard']
+ for listen in listen_address:
+ test.append(f'listen {listen}')
+ self.assertEqual(tmp, test)
+
+ # Check for running process
+ self.assertTrue("ntpd" in (p.name() for p in process_iter()))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/scripts/cli/test_vpn_anyconnect.py b/scripts/cli/test_vpn_anyconnect.py
new file mode 100755
index 000000000..dd8ab1609
--- /dev/null
+++ b/scripts/cli/test_vpn_anyconnect.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from psutil import process_iter
+from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.util import read_file
+
+OCSERV_CONF = '/run/ocserv/ocserv.conf'
+base_path = ['vpn', 'anyconnect']
+cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem'
+cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key'
+
+class TestVpnAnyconnect(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+
+ def tearDown(self):
+ # Delete vpn anyconnect configuration
+ self.session.delete(base_path)
+ self.session.commit()
+
+ del self.session
+
+ def test_vpn(self):
+ user = 'vyos_user'
+ password = 'vyos_pass'
+ self.session.delete(base_path)
+ self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password])
+ self.session.set(base_path + ["authentication", "mode", "local"])
+ self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"])
+ self.session.set(base_path + ["ssl", "ca-cert-file", cert])
+ self.session.set(base_path + ["ssl", "cert-file", cert])
+ self.session.set(base_path + ["ssl", "key-file", cert_key])
+
+ self.session.commit()
+
+ # Check for running process
+ self.assertTrue("ocserv-main" in (p.name() for p in process_iter()))
+
+if __name__ == '__main__':
+ unittest.main()