diff options
Diffstat (limited to 'smoketest/scripts/cli/test_interfaces_macsec.py')
| -rwxr-xr-x | smoketest/scripts/cli/test_interfaces_macsec.py | 136 | 
1 files changed, 95 insertions, 41 deletions
| diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index 0f1b6486d..6d1be86ba 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -16,15 +16,17 @@  import re  import unittest -from psutil import process_iter -from vyos.ifconfig import Section  from base_interfaces_test import BasicInterfaceTest +from netifaces import interfaces +  from vyos.configsession import ConfigSessionError +from vyos.ifconfig import Section  from vyos.util import read_file +from vyos.util import process_named_running -def get_config_value(intf, key): -    tmp = read_file(f'/run/wpa_supplicant/{intf}.conf') +def get_config_value(interface, key): +    tmp = read_file(f'/run/wpa_supplicant/{interface}.conf')      tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)      return tmp[0] @@ -32,71 +34,123 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):      def setUp(self):           super().setUp()           self._base_path = ['interfaces', 'macsec'] -         self._options = { -             'macsec0': ['source-interface eth0', -                         'security cipher gcm-aes-128'] -         } +         self._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }           # if we have a physical eth1 interface, add a second macsec instance           if 'eth1' in Section.interfaces("ethernet"): -             macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] } +             macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] }               self._options.update(macsec)           self._interfaces = list(self._options)      def test_encryption(self): -        """ MACsec can be operating in authentication and encryption -        mode - both using different mandatory settings, lets test -        encryption as the basic authentication test has been performed -        using the base class tests """ -        intf = 'macsec0' -        src_intf = 'eth0' +        """ MACsec can be operating in authentication and encryption mode - both +        using different mandatory settings, lets test encryption as the basic +        authentication test has been performed using the base class tests """ +          mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4'          mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836'          replay_window = '64' -        self.session.set(self._base_path + [intf, 'security', 'encrypt']) -        # check validate() - Cipher suite must be set for MACsec -        with self.assertRaises(ConfigSessionError): -            self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128']) +        for interface, option_value in self._options.items(): +            for option in option_value: +                if option.split()[0] == 'source-interface': +                    src_interface = option.split()[1] -        # check validate() - Physical source interface must be set for MACsec -        with self.assertRaises(ConfigSessionError): +                self.session.set(self._base_path + [interface] + option.split()) + +            # Encrypt link +            self.session.set(self._base_path + [interface, 'security', 'encrypt']) + +            # check validate() - Physical source interface MTU must be higher then our MTU +            self.session.set(self._base_path + [interface, 'mtu', '1500']) +            with self.assertRaises(ConfigSessionError): +                self.session.commit() +            self.session.delete(self._base_path + [interface, 'mtu']) + +            # check validate() - MACsec security keys mandartory when encryption is enabled +            with self.assertRaises(ConfigSessionError): +                self.session.commit() +            self.session.set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak]) + +            # check validate() - MACsec security keys mandartory when encryption is enabled +            with self.assertRaises(ConfigSessionError): +                self.session.commit() +            self.session.set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn]) + +            self.session.set(self._base_path + [interface, 'security', 'replay-window', replay_window]) + +            # final commit of settings              self.session.commit() -        self.session.set(self._base_path + [intf, 'source-interface', src_intf]) -        # check validate() - MACsec security keys mandartory when encryption is enabled +            tmp = get_config_value(src_interface, 'macsec_integ_only') +            self.assertTrue("0" in tmp) + +            tmp = get_config_value(src_interface, 'mka_cak') +            self.assertTrue(mak_cak in tmp) + +            tmp = get_config_value(src_interface, 'mka_ckn') +            self.assertTrue(mak_ckn in tmp) + +            # check that the default priority of 255 is programmed +            tmp = get_config_value(src_interface, 'mka_priority') +            self.assertTrue("255" in tmp) + +            tmp = get_config_value(src_interface, 'macsec_replay_window') +            self.assertTrue(replay_window in tmp) + +            tmp = read_file(f'/sys/class/net/{interface}/mtu') +            self.assertEqual(tmp, '1460') + +            # Check for running process +            self.assertTrue(process_named_running('wpa_supplicant')) + +    def test_mandatory_toptions(self): +        interface = 'macsec1' +        self.session.set(self._base_path + [interface]) + +        # check validate() - source interface is mandatory          with self.assertRaises(ConfigSessionError):              self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak]) +        self.session.set(self._base_path + [interface, 'source-interface', 'eth0']) -        # check validate() - MACsec security keys mandartory when encryption is enabled +        # check validate() - cipher is mandatory          with self.assertRaises(ConfigSessionError):              self.session.commit() -        self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn]) +        self.session.set(self._base_path + [interface, 'security', 'cipher', 'gcm-aes-128']) -        self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window]) +        # final commit and verify          self.session.commit() +        self.assertIn(interface, interfaces()) -        tmp = get_config_value(src_intf, 'macsec_integ_only') -        self.assertTrue("0" in tmp) +    def test_source_interface(self): +        """ Ensure source-interface can bot be part of any other bond or bridge """ -        tmp = get_config_value(src_intf, 'mka_cak') -        self.assertTrue(mak_cak in tmp) +        base_bridge = ['interfaces', 'bridge', 'br200'] +        base_bond = ['interfaces', 'bonding', 'bond200'] -        tmp = get_config_value(src_intf, 'mka_ckn') -        self.assertTrue(mak_ckn in tmp) +        for interface, option_value in self._options.items(): +            for option in option_value: +                self.session.set(self._base_path + [interface] + option.split()) +                if option.split()[0] == 'source-interface': +                    src_interface = option.split()[1] -        # check that the default priority of 255 is programmed -        tmp = get_config_value(src_intf, 'mka_priority') -        self.assertTrue("255" in tmp) +            self.session.set(base_bridge + ['member', 'interface', src_interface]) +            # check validate() - Source interface must not already be a member of a bridge +            with self.assertRaises(ConfigSessionError): +                self.session.commit() +            self.session.delete(base_bridge) -        tmp = get_config_value(src_intf, 'macsec_replay_window') -        self.assertTrue(replay_window in tmp) +            self.session.set(base_bond + ['member', 'interface', src_interface]) +            # check validate() - Source interface must not already be a member of a bridge +            with self.assertRaises(ConfigSessionError): +                self.session.commit() +            self.session.delete(base_bond) -        # Check for running process -        self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter())) +            # final commit and verify +            self.session.commit() +            self.assertIn(interface, interfaces())  if __name__ == '__main__':      unittest.main() + | 
