summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/test_service_ids_ddos-protection.py
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli/test_service_ids_ddos-protection.py')
-rwxr-xr-xsmoketest/scripts/cli/test_service_ids_ddos-protection.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_service_ids_ddos-protection.py b/smoketest/scripts/cli/test_service_ids_ddos-protection.py
new file mode 100755
index 000000000..91b056eea
--- /dev/null
+++ b/smoketest/scripts/cli/test_service_ids_ddos-protection.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2022 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSessionError
+from vyos.utils.process import process_named_running
+from vyos.utils.file import read_file
+
+PROCESS_NAME = 'fastnetmon'
+FASTNETMON_CONF = '/run/fastnetmon/fastnetmon.conf'
+NETWORKS_CONF = '/run/fastnetmon/networks_list'
+EXCLUDED_NETWORKS_CONF = '/run/fastnetmon/excluded_networks_list'
+base_path = ['service', 'ids', 'ddos-protection']
+
+class TestServiceIDS(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestServiceIDS, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ def tearDown(self):
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ # delete test config
+ self.cli_delete(base_path)
+ self.cli_commit()
+
+ self.assertFalse(os.path.exists(FASTNETMON_CONF))
+ self.assertFalse(process_named_running(PROCESS_NAME))
+
+ def test_fastnetmon(self):
+ networks = ['10.0.0.0/24', '10.5.5.0/24', '2001:db8:10::/64', '2001:db8:20::/64']
+ excluded_networks = ['10.0.0.1/32', '2001:db8:10::1/128']
+ interfaces = ['eth0', 'eth1']
+ fps = '3500'
+ mbps = '300'
+ pps = '60000'
+
+ self.cli_set(base_path + ['mode', 'mirror'])
+ # Required network!
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ for tmp in networks:
+ self.cli_set(base_path + ['network', tmp])
+
+ # optional excluded-network!
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ for tmp in excluded_networks:
+ self.cli_set(base_path + ['excluded-network', tmp])
+
+ # Required interface(s)!
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ for tmp in interfaces:
+ self.cli_set(base_path + ['listen-interface', tmp])
+
+ self.cli_set(base_path + ['direction', 'in'])
+ self.cli_set(base_path + ['threshold', 'general', 'fps', fps])
+ self.cli_set(base_path + ['threshold', 'general', 'pps', pps])
+ self.cli_set(base_path + ['threshold', 'general', 'mbps', mbps])
+
+ # commit changes
+ self.cli_commit()
+
+ # Check configured port
+ config = read_file(FASTNETMON_CONF)
+ self.assertIn(f'mirror_afpacket = on', config)
+ self.assertIn(f'process_incoming_traffic = on', config)
+ self.assertIn(f'process_outgoing_traffic = off', config)
+ self.assertIn(f'ban_for_flows = on', config)
+ self.assertIn(f'threshold_flows = {fps}', config)
+ self.assertIn(f'ban_for_bandwidth = on', config)
+ self.assertIn(f'threshold_mbps = {mbps}', config)
+ self.assertIn(f'ban_for_pps = on', config)
+ self.assertIn(f'threshold_pps = {pps}', config)
+ # default
+ self.assertIn(f'enable_ban = on', config)
+ self.assertIn(f'enable_ban_ipv6 = on', config)
+ self.assertIn(f'ban_time = 1900', config)
+
+ tmp = ','.join(interfaces)
+ self.assertIn(f'interfaces = {tmp}', config)
+
+
+ network_config = read_file(NETWORKS_CONF)
+ for tmp in networks:
+ self.assertIn(f'{tmp}', network_config)
+
+ excluded_network_config = read_file(EXCLUDED_NETWORKS_CONF)
+ for tmp in excluded_networks:
+ self.assertIn(f'{tmp}', excluded_network_config)
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)