From d93b1203bd2e07f8c08c0be88505e8e94f950c53 Mon Sep 17 00:00:00 2001 From: Christian Poessinger Date: Wed, 22 Dec 2021 22:01:48 +0100 Subject: smoketest: flow-accounting: add sflow and netflow testcases --- .../scripts/cli/test_system_flow-accounting.py | 152 ++++++++++++++++++++- 1 file changed, 150 insertions(+), 2 deletions(-) (limited to 'smoketest') diff --git a/smoketest/scripts/cli/test_system_flow-accounting.py b/smoketest/scripts/cli/test_system_flow-accounting.py index a2b5b1481..57866a198 100755 --- a/smoketest/scripts/cli/test_system_flow-accounting.py +++ b/smoketest/scripts/cli/test_system_flow-accounting.py @@ -47,7 +47,10 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): def test_basic(self): buffer_size = '5' # MiB + syslog = 'all' + self.cli_set(base_path + ['buffer-size', buffer_size]) + self.cli_set(base_path + ['syslog-facility', syslog]) # You need to configure at least one interface for flow-accounting with self.assertRaises(ConfigSessionError): @@ -73,8 +76,153 @@ class TestSystemFlowAccounting(VyOSUnitTestSHIM.TestCase): tmp //= 1000 self.assertIn(f'plugin_buffer_size: {tmp}', uacctd) - # Check for running process - self.assertTrue(process_named_running(PROCESS_NAME)) + # when 'disable-imt' is not configured on the CLI it must be present + self.assertIn(f'imt_path: /tmp/uacctd.pipe', uacctd) + self.assertIn(f'imt_mem_pools_number: 169', uacctd) + self.assertIn(f'syslog: {syslog}', uacctd) + self.assertIn(f'plugins: memory', uacctd) + + def test_sflow(self): + sampling_rate = '4000' + source_address = '192.0.2.1' + dummy_if = 'dum3841' + agent_address = '192.0.2.2' + + sflow_server = { + '1.2.3.4' : { + }, + '5.6.7.8' : { + 'port' : '6000' + } + } + + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', agent_address + '/32']) + self.cli_set(base_path + ['disable-imt']) + + # You need to configure at least one interface for flow-accounting + with self.assertRaises(ConfigSessionError): + self.cli_commit() + for interface in Section.interfaces('ethernet'): + self.cli_set(base_path + ['interface', interface]) + + + # You need to configure at least one sFlow or NetFlow protocol, or not + # set "disable-imt" for flow-accounting + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_set(base_path + ['sflow', 'agent-address', agent_address]) + self.cli_set(base_path + ['sflow', 'sampling-rate', sampling_rate]) + self.cli_set(base_path + ['sflow', 'source-address', source_address]) + for server, server_config in sflow_server.items(): + self.cli_set(base_path + ['sflow', 'server', server]) + if 'port' in server_config: + self.cli_set(base_path + ['sflow', 'server', server, 'port', server_config['port']]) + + # commit changes + self.cli_commit() + + uacctd = read_file(uacctd_conf) + + # when 'disable-imt' is not configured on the CLI it must be present + self.assertNotIn(f'imt_path: /tmp/uacctd.pipe', uacctd) + self.assertNotIn(f'imt_mem_pools_number: 169', uacctd) + self.assertNotIn(f'plugins: memory', uacctd) + + for server, server_config in sflow_server.items(): + if 'port' in server_config: + self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}', uacctd) + else: + self.assertIn(f'sfprobe_receiver[sf_{server}]: {server}:6343', uacctd) + + self.assertIn(f'sfprobe_agentip[sf_{server}]: {agent_address}', uacctd) + self.assertIn(f'sampling_rate[sf_{server}]: {sampling_rate}', uacctd) + self.assertIn(f'sfprobe_source_ip[sf_{server}]: {source_address}', uacctd) + + self.cli_delete(['interfaces', 'dummy', dummy_if]) + + def test_netflow(self): + engine_id = '33' + max_flows = '667' + sampling_rate = '100' + source_address = '192.0.2.1' + dummy_if = 'dum3842' + agent_address = '192.0.2.10' + version = '10' + tmo_expiry = '120' + tmo_flow = '1200' + tmo_icmp = '60' + tmo_max = '50000' + tmo_tcp_fin = '100' + tmo_tcp_generic = '120' + tmo_tcp_rst = '99' + tmo_udp = '10' + + netflow_server = { + '11.22.33.44' : { + }, + '55.66.77.88' : { + 'port' : '6000' + } + } + + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', agent_address + '/32']) + + for interface in Section.interfaces('ethernet'): + self.cli_set(base_path + ['interface', interface]) + + self.cli_set(base_path + ['netflow', 'engine-id', engine_id]) + self.cli_set(base_path + ['netflow', 'max-flows', max_flows]) + self.cli_set(base_path + ['netflow', 'sampling-rate', sampling_rate]) + self.cli_set(base_path + ['netflow', 'source-ip', source_address]) + self.cli_set(base_path + ['netflow', 'version', version]) + + # timeouts + self.cli_set(base_path + ['netflow', 'timeout', 'expiry-interval', tmo_expiry]) + self.cli_set(base_path + ['netflow', 'timeout', 'flow-generic', tmo_flow]) + self.cli_set(base_path + ['netflow', 'timeout', 'icmp', tmo_icmp]) + self.cli_set(base_path + ['netflow', 'timeout', 'max-active-life', tmo_max]) + self.cli_set(base_path + ['netflow', 'timeout', 'tcp-fin', tmo_tcp_fin]) + self.cli_set(base_path + ['netflow', 'timeout', 'tcp-generic', tmo_tcp_generic]) + self.cli_set(base_path + ['netflow', 'timeout', 'tcp-rst', tmo_tcp_rst]) + self.cli_set(base_path + ['netflow', 'timeout', 'udp', tmo_udp]) + + # You need to configure at least one netflow server + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + for server, server_config in netflow_server.items(): + self.cli_set(base_path + ['netflow', 'server', server]) + if 'port' in server_config: + self.cli_set(base_path + ['netflow', 'server', server, 'port', server_config['port']]) + + # commit changes + self.cli_commit() + + uacctd = read_file(uacctd_conf) + + tmp = 'plugins: ' + for server, server_config in netflow_server.items(): + tmp += f'nfprobe[nf_{server}],' + tmp += 'memory' + self.assertIn(f'{tmp}', uacctd) + + for server, server_config in netflow_server.items(): + self.assertIn(f'nfprobe_engine[nf_{server}]: {engine_id}', uacctd) + self.assertIn(f'nfprobe_maxflows[nf_{server}]: {max_flows}', uacctd) + self.assertIn(f'sampling_rate[nf_{server}]: {sampling_rate}', uacctd) + self.assertIn(f'nfprobe_source_ip[nf_{server}]: {source_address}', uacctd) + self.assertIn(f'nfprobe_version[nf_{server}]: {version}', uacctd) + + if 'port' in server_config: + self.assertIn(f'nfprobe_receiver[nf_{server}]: {server}', uacctd) + else: + self.assertIn(f'nfprobe_receiver[nf_{server}]: {server}:2055', uacctd) + + self.assertIn(f'nfprobe_timeouts[nf_{server}]: expint={tmo_expiry}:general={tmo_flow}:icmp={tmo_icmp}:maxlife={tmo_max}:tcp.fin={tmo_tcp_fin}:tcp={tmo_tcp_generic}:tcp.rst={tmo_tcp_rst}:udp={tmo_udp}', uacctd) + + + self.cli_delete(['interfaces', 'dummy', dummy_if]) if __name__ == '__main__': unittest.main(verbosity=2) -- cgit v1.2.3