#!/usr/bin/env python3
#
# Copyright (C) 2020-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import json
import unittest

from base_interfaces_test import BasicInterfaceTest
from glob import glob
from netifaces import interfaces

from vyos.ifconfig import Section
from vyos.util import cmd
from vyos.util import read_file
from vyos.util import get_interface_config
from vyos.validate import is_intf_addr_assigned

class BridgeInterfaceTest(BasicInterfaceTest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls._test_dhcp = True
        cls._test_ip = True
        cls._test_ipv6 = True
        cls._test_ipv6_pd = True
        cls._test_ipv6_dhcpc6 = True
        cls._test_vlan = True
        cls._base_path = ['interfaces', 'bridge']
        cls._mirror_interfaces = ['dum21354']
        cls._members = []

        # we need to filter out VLAN interfaces identified by a dot (.)
        # in their name - just in case!
        if 'TEST_ETH' in os.environ:
            cls._members = os.environ['TEST_ETH'].split()
        else:
            for tmp in Section.interfaces('ethernet'):
                if not '.' in tmp:
                    cls._members.append(tmp)

        cls._options['br0'] = []
        for member in cls._members:
            cls._options['br0'].append(f'member interface {member}')
        cls._interfaces = list(cls._options)

        # call base-classes classmethod
        super(cls, cls).setUpClass()

    def tearDown(self):
        for intf in self._interfaces:
            self.cli_delete(self._base_path + [intf])

        super().tearDown()

    def test_isolated_interfaces(self):
        # Add member interfaces to bridge and set STP cost/priority
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['stp'])

            # assign members to bridge interface
            for member in self._members:
                base_member = base + ['member', 'interface', member]
                self.cli_set(base_member + ['isolated'])

        # commit config
        self.cli_commit()

        for interface in self._interfaces:
            tmp = get_interface_config(interface)
            # STP must be enabled as configured above
            self.assertEqual(1, tmp['linkinfo']['info_data']['stp_state'])

            # validate member interface configuration
            for member in self._members:
                tmp = get_interface_config(member)
                # Isolated must be enabled as configured above
                self.assertTrue(tmp['linkinfo']['info_slave_data']['isolated'])


    def test_add_remove_bridge_member(self):
        # Add member interfaces to bridge and set STP cost/priority
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['stp'])
            self.cli_set(base + ['address', '192.0.2.1/24'])

            cost = 1000
            priority = 10
            # assign members to bridge interface
            for member in self._members:
                base_member = base + ['member', 'interface', member]
                self.cli_set(base_member + ['cost', str(cost)])
                self.cli_set(base_member + ['priority', str(priority)])
                cost += 1
                priority += 1

        # commit config
        self.cli_commit()

        # Add member interfaces to bridge and set STP cost/priority
        for interface in self._interfaces:
            cost = 1000
            priority = 10
            for member in self._members:
                tmp = get_interface_config(member)
                self.assertEqual(interface, tmp['master'])
                self.assertFalse(           tmp['linkinfo']['info_slave_data']['isolated'])
                self.assertEqual(cost,      tmp['linkinfo']['info_slave_data']['cost'])
                self.assertEqual(priority,  tmp['linkinfo']['info_slave_data']['priority'])

                cost += 1
                priority += 1


    def test_vif_8021q_interfaces(self):
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['enable-vlan'])
        super().test_vif_8021q_interfaces()

    def test_vif_8021q_lower_up_down(self):
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['enable-vlan'])
        super().test_vif_8021q_lower_up_down()

    def test_vif_8021q_mtu_limits(self):
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['enable-vlan'])
        super().test_vif_8021q_mtu_limits()

    def test_bridge_vlan_filter(self):
        def _verify_members() -> None:
            # check member interfaces are added on the bridge
            for interface in self._interfaces:
                bridge_members = []
                for tmp in glob(f'/sys/class/net/{interface}/lower_*'):
                    bridge_members.append(os.path.basename(tmp).replace('lower_', ''))

                # We can not use assertListEqual() b/c the position of the interface
                # names within the list is not fixed
                self.assertEqual(len(self._members), len(bridge_members))
                for member in self._members:
                    self.assertIn(member, bridge_members)

        def _check_vlan_filter() -> None:
            for interface in self._interfaces:
                tmp = cmd(f'bridge -j vlan show dev {interface}')
                tmp = json.loads(tmp)
                self.assertIsNotNone(tmp)

                for interface_status in tmp:
                    ifname = interface_status['ifname']
                    for interface in self._members:
                        vlan_success = 0;
                        if interface == ifname:
                            vlans_status = interface_status['vlans']
                            for vlan_status in vlans_status:
                                vlan_id = vlan_status['vlan']
                                flag_num = 0
                                if 'flags' in vlan_status:
                                    flags = vlan_status['flags']
                                    for flag in flags:
                                        flag_num = flag_num +1
                                if vlan_id == 2:
                                    if flag_num == 0:
                                        vlan_success = vlan_success + 1
                                else:
                                    for id in range(4,10):
                                        if vlan_id == id:
                                            if flag_num == 0:
                                                vlan_success = vlan_success + 1
                                    if vlan_id >= 101:
                                        if flag_num == 2:
                                            vlan_success = vlan_success + 1
                            self.assertGreaterEqual(vlan_success, 7)

        vif_vlan = 2
        # Add member interface to bridge and set VLAN filter
        for interface in self._interfaces:
            base = self._base_path + [interface]
            self.cli_set(base + ['enable-vlan'])
            self.cli_set(base + ['address', '192.0.2.1/24'])
            self.cli_set(base + ['vif', str(vif_vlan), 'address', '192.0.3.1/24'])
            self.cli_set(base + ['vif', str(vif_vlan), 'mtu', self._mtu])

            vlan_id = 101
            allowed_vlan = 2
            allowed_vlan_range = '4-9'
            # assign members to bridge interface
            for member in self._members:
                base_member = base + ['member', 'interface', member]
                self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)])
                self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range])
                self.cli_set(base_member + ['native-vlan', str(vlan_id)])
                vlan_id += 1

        # commit config
        self.cli_commit()

        # Verify correct setting of VLAN filter function
        for interface in self._interfaces:
            tmp = read_file(f'/sys/class/net/{interface}/bridge/vlan_filtering')
            self.assertEqual(tmp, '1')

        # Execute the program to obtain status information and verify proper
        # VLAN filter setup
        _check_vlan_filter()

        # check member interfaces are added on the bridge
        _verify_members()

        # change member interface description to trigger config update,
        # VLANs must still exist (T4565)
        for interface in self._interfaces:
            for member in self._members:
                self.cli_set(['interfaces', Section.section(member), member, 'description', f'foo {member}'])

        # commit config
        self.cli_commit()

        # check member interfaces are added on the bridge
        _verify_members()

        # Execute the program to obtain status information and verify proper
        # VLAN filter setup
        _check_vlan_filter()

        # delete all members
        for interface in self._interfaces:
            self.cli_delete(self._base_path + [interface, 'member'])

        # commit config
        self.cli_commit()

        # verify member interfaces are no longer assigned on the bridge
        for interface in self._interfaces:
            bridge_members = []
            for tmp in glob(f'/sys/class/net/{interface}/lower_*'):
                bridge_members.append(os.path.basename(tmp).replace('lower_', ''))

            self.assertNotEqual(len(self._members), len(bridge_members))
            for member in self._members:
                self.assertNotIn(member, bridge_members)


    def test_bridge_vif_members(self):
        # T2945: ensure that VIFs are not dropped from bridge
        vifs = ['300', '400']
        for interface in self._interfaces:
            for member in self._members:
                for vif in vifs:
                    self.cli_set(['interfaces', 'ethernet', member, 'vif', vif])
                    self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}'])

        self.cli_commit()

        # Verify config
        for interface in self._interfaces:
            for member in self._members:
                for vif in vifs:
                    # member interface must be assigned to the bridge
                    self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif}'))

        # delete all members
        for interface in self._interfaces:
            for member in self._members:
                for vif in vifs:
                    self.cli_delete(['interfaces', 'ethernet', member, 'vif', vif])
                    self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}'])

    def test_bridge_vif_s_vif_c_members(self):
        # T2945: ensure that VIFs are not dropped from bridge
        vifs = ['300', '400']
        vifc = ['301', '401']
        for interface in self._interfaces:
            for member in self._members:
                for vif_s in vifs:
                    for vif_c in vifc:
                        self.cli_set(['interfaces', 'ethernet', member, 'vif-s', vif_s, 'vif-c', vif_c])
                        self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}'])

        self.cli_commit()

        # Verify config
        for interface in self._interfaces:
            for member in self._members:
                for vif_s in vifs:
                    for vif_c in vifc:
                        # member interface must be assigned to the bridge
                        self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif_s}.{vif_c}'))

        # delete all members
        for interface in self._interfaces:
            for member in self._members:
                for vif_s in vifs:
                    self.cli_delete(['interfaces', 'ethernet', member, 'vif-s', vif_s])
                    for vif_c in vifc:
                        self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif_s}.{vif_c}'])

if __name__ == '__main__':
    unittest.main(verbosity=2)