diff options
| -rwxr-xr-x | smoketest/scripts/cli/test_service_ids.py | 93 | 
1 files changed, 93 insertions, 0 deletions
| diff --git a/smoketest/scripts/cli/test_service_ids.py b/smoketest/scripts/cli/test_service_ids.py new file mode 100755 index 000000000..e5f7ca071 --- /dev/null +++ b/smoketest/scripts/cli/test_service_ids.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSessionError +from vyos.util import process_named_running +from vyos.util import read_file + +PROCESS_NAME = 'fastnetmon' +FASTNETMON_CONF = '/etc/fastnetmon.conf' +base_path = ['service', 'ids', 'ddos-protection'] + +class TestServiceIDS(VyOSUnitTestSHIM.TestCase): +    @classmethod +    def setUpClass(cls): +        super(cls, cls).setUpClass() + +        # ensure we can also run this test on a live system - so lets clean +        # out the current configuration :) +        cls.cli_delete(cls, base_path) + +    def tearDown(self): +        # Check for running process +        self.assertTrue(process_named_running(PROCESS_NAME)) + +        # delete test config +        self.cli_delete(base_path) +        self.cli_commit() + +        self.assertFalse(os.path.exists(FASTNETMON_CONF)) +        self.assertFalse(process_named_running(PROCESS_NAME)) + +    def test_fastnetmon(self): +        networks = ['10.0.0.0/24', '10.5.5.0/24'] +        interfaces = ['eth0', 'eth1'] +        fps = '3500' +        mbps = '300' +        pps = '60000' + +        self.cli_set(base_path + ['mode', 'mirror']) +        # Required network! +        with self.assertRaises(ConfigSessionError): +            self.cli_commit() +        for tmp in networks: +            self.cli_set(base_path + ['network', tmp]) + +        # Required interface(s)! +        with self.assertRaises(ConfigSessionError): +            self.cli_commit() +        for tmp in interfaces: +            self.cli_set(base_path + ['listen-interface', tmp]) + +        self.cli_set(base_path + ['direction', 'in']) +        self.cli_set(base_path + ['threshold', 'fps', fps]) +        self.cli_set(base_path + ['threshold', 'pps', pps]) +        self.cli_set(base_path + ['threshold', 'mbps', mbps]) + +        # commit changes +        self.cli_commit() + +        # Check configured port +        config = read_file(FASTNETMON_CONF) +        self.assertIn(f'mirror_afpacket = on', config) +        self.assertIn(f'process_incoming_traffic = on', config) +        self.assertIn(f'process_outgoing_traffic = off', config) +        self.assertIn(f'ban_for_flows = on', config) +        self.assertIn(f'threshold_flows = {fps}', config) +        self.assertIn(f'ban_for_bandwidth = on', config) +        self.assertIn(f'threshold_mbps = {mbps}', config) +        self.assertIn(f'ban_for_pps = on', config) +        self.assertIn(f'threshold_pps = {pps}', config) + +        self.assertIn(f'interfaces = {interfaces.join(",")}', config) + +if __name__ == '__main__': +    unittest.main(verbosity=2) | 
