# Copyright (C) 2019-2020 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import re
import os
import unittest
import json

from binascii import hexlify

from netifaces import ifaddresses
from netifaces import AF_INET
from netifaces import AF_INET6

from vyos.configsession import ConfigSession
from vyos.ifconfig import Interface
from vyos.util import read_file
from vyos.util import cmd
from vyos.util import dict_search
from vyos.util import process_named_running
from vyos.validate import is_intf_addr_assigned
from vyos.validate import is_ipv6_link_local

def read_mirror_rule(interfaces):
    Success = 0
    for interface in interfaces:
        get_tc_cmd = 'tc -j qdisc'
        tmp = cmd(get_tc_cmd, shell=True)
        data = json.loads(tmp)
        for rule in data:
            dev = rule['dev']
            handle = rule['handle']
            kind = rule['kind']
            if dev == interface and handle == "ffff:" and kind == "ingress":
                Success+=1
            elif dev == interface and handle == "1:" and kind == "prio":
                Success+=1
    return Success


dhcp6c_config_file = '/run/dhcp6c/dhcp6c.{}.conf'
def get_dhcp6c_config_value(interface, key):
    tmp = read_file(dhcp6c_config_file.format(interface))
    tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)

    out = []
    for item in tmp:
        out.append(item.replace(';',''))
    return out

class BasicInterfaceTest:
    class BaseTest(unittest.TestCase):
        _test_ip = False
        _test_mtu = False
        _test_vlan = False
        _test_qinq = False
        _test_ipv6 = False
        _test_mirror = False
        _base_path = []

        _options = {}
        _interfaces = []
        _qinq_range = ['10', '20', '30']
        _vlan_range = ['100', '200', '300', '2000']
        # choose IPv6 minimum MTU value for tests - this must always work
        _mtu = '1280'

        def setUp(self):
            self.session = ConfigSession(os.getpid())

            self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
                                '2001:db8:1::ffff/64', '2001:db8:101::1/112']
            self._test_mtu = False
            self._options = {}

        def tearDown(self):
            # we should not remove ethernet from the overall CLI
            if 'ethernet' in self._base_path:
                for interface in self._interfaces:
                    # when using a dedicated interface to test via TEST_ETH environment
                    # variable only this one will be cleared in the end - usable to test
                    # ethernet interfaces via SSH
                    self.session.delete(self._base_path + [interface])
                    self.session.set(self._base_path + [interface, 'duplex', 'auto'])
                    self.session.set(self._base_path + [interface, 'speed', 'auto'])
            else:
                self.session.delete(self._base_path)

            self.session.commit()
            del self.session

        def test_mirror(self):
            # Test is disabled as it contains hardcoded bond interfaces which will
            # screw up all kinds of live deployments.
            return None

            if self._test_mirror:
                # Create test dependency interface
                self.session.set(['interfaces','dummy','dum0'])
                self.session.set(['interfaces','dummy','dum1'])
                self.session.set(['interfaces','bonding','bond1','member','interface','dum0'])
                self.session.set(['interfaces','bonding','bond1','member','interface','dum1'])

                # ^- WHY? There is self._options for that :(

                Success = 0
                i = 0
                # Check the two-way mirror rules of ingress and egress
                for interface in self._interfaces:
                    self.session.set(self._base_path + [interface, 'mirror', 'ingress', 'bond1'])
                    self.session.set(self._base_path + [interface, 'mirror', 'egress', 'bond1'])
                    i+=1
                self.session.commit()
                # Parse configuration
                Success = read_mirror_rule(self._interfaces)
                if Success == i*2:
                    self.assertTrue(True)
                else:
                    self.assertTrue(False)
                i=0
                self.session.delete(['interfaces','dummy'])
                self.session.delete(['interfaces','bonding'])


        def test_add_description(self):
            """
            Check if description can be added to interface
            """
            for intf in self._interfaces:
                test_string=f'Description-Test-{intf}'
                self.session.set(self._base_path + [intf, 'description', test_string])
                for option in self._options.get(intf, []):
                    self.session.set(self._base_path + [intf] + option.split())

            self.session.commit()

            # Validate interface description
            for intf in self._interfaces:
                test_string=f'Description-Test-{intf}'
                with open(f'/sys/class/net/{intf}/ifalias', 'r') as f:
                    tmp = f.read().rstrip()
                    self.assertTrue(tmp, test_string)

        def test_add_address_single(self):
            """
            Check if a single address can be added to interface.
            """
            addr = '192.0.2.0/31'
            for intf in self._interfaces:
                self.session.set(self._base_path + [intf, 'address', addr])
                for option in self._options.get(intf, []):
                    self.session.set(self._base_path + [intf] + option.split())

            self.session.commit()

            for intf in self._interfaces:
                self.assertTrue(is_intf_addr_assigned(intf, addr))

        def test_add_address_multi(self):
            """
            Check if IPv4/IPv6 addresses can be added to interface.
            """

            # Add address
            for intf in self._interfaces:
                for addr in self._test_addr:
                    self.session.set(self._base_path + [intf, 'address', addr])
                    for option in self._options.get(intf, []):
                        self.session.set(self._base_path + [intf] + option.split())

            self.session.commit()

            # Validate address
            for intf in self._interfaces:
                for af in AF_INET, AF_INET6:
                    for addr in ifaddresses(intf)[af]:
                        # checking link local addresses makes no sense
                        if is_ipv6_link_local(addr['addr']):
                            continue

                        self.assertTrue(is_intf_addr_assigned(intf, addr['addr']))

        def test_ipv6_link_local(self):
            """ Common function for IPv6 link-local address assignemnts """
            if not self._test_ipv6:
                return None

            for interface in self._interfaces:
                base = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(base + option.split())

            # after commit we must have an IPv6 link-local address
            self.session.commit()

            for interface in self._interfaces:
                for addr in ifaddresses(interface)[AF_INET6]:
                    self.assertTrue(is_ipv6_link_local(addr['addr']))

            # disable IPv6 link-local address assignment
            for interface in self._interfaces:
                base = self._base_path + [interface]
                self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])

            # after commit we must have no IPv6 link-local address
            self.session.commit()

            for interface in self._interfaces:
                self.assertTrue(AF_INET6 not in ifaddresses(interface))

        def _mtu_test(self, intf):
            """ helper function to verify MTU size """
            with open(f'/sys/class/net/{intf}/mtu', 'r') as f:
                tmp = f.read().rstrip()
                self.assertEqual(tmp, self._mtu)

        def test_change_mtu(self):
            """ Testcase if MTU can be changed on interface """
            if not self._test_mtu:
                return None

            for intf in self._interfaces:
                base = self._base_path + [intf]
                self.session.set(base + ['mtu', self._mtu])
                for option in self._options.get(intf, []):
                    self.session.set(base + option.split())

            # commit interface changes
            self.session.commit()

            # verify changed MTU
            for intf in self._interfaces:
                self._mtu_test(intf)

        def test_change_mtu_1200(self):
            """ Testcase if MTU can be changed to 1200 on non IPv6 enabled interfaces """
            if not self._test_mtu:
                return None

            old_mtu = self._mtu
            self._mtu = '1200'

            for intf in self._interfaces:
                base = self._base_path + [intf]
                self.session.set(base + ['mtu', self._mtu])
                self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])

                for option in self._options.get(intf, []):
                    self.session.set(base + option.split())

            # commit interface changes
            self.session.commit()

            # verify changed MTU
            for intf in self._interfaces:
                self._mtu_test(intf)

            self._mtu = old_mtu

        def test_8021q_vlan(self):
            """ Testcase for 802.1q VLAN interfaces """
            if not self._test_vlan:
                return None

            for interface in self._interfaces:
                base = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(base + option.split())

                for vlan in self._vlan_range:
                    base = self._base_path + [interface, 'vif', vlan]
                    self.session.set(base + ['mtu', self._mtu])
                    for address in self._test_addr:
                        self.session.set(base + ['address', address])

            self.session.commit()
            for intf in self._interfaces:
                for vlan in self._vlan_range:
                    vif = f'{intf}.{vlan}'
                    for address in self._test_addr:
                        self.assertTrue(is_intf_addr_assigned(vif, address))
                    self._mtu_test(vif)


        def test_8021ad_qinq_vlan(self):
            """ Testcase for 802.1ad Q-in-Q VLAN interfaces """
            if not self._test_qinq:
                return None

            for interface in self._interfaces:
                base = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(base + option.split())

                for vif_s in self._qinq_range:
                    for vif_c in self._vlan_range:
                        base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
                        self.session.set(base + ['mtu', self._mtu])
                        for address in self._test_addr:
                            self.session.set(base + ['address', address])

            self.session.commit()

            for interface in self._interfaces:
                for vif_s in self._qinq_range:
                    tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0]
                    self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')

                    for vif_c in self._vlan_range:
                        vif = f'{interface}.{vif_s}.{vif_c}'
                        for address in self._test_addr:
                            self.assertTrue(is_intf_addr_assigned(vif, address))
                        self._mtu_test(vif)

        def test_ip_options(self):
            """ Test interface base IPv4 options """
            if not self._test_ip:
                return None

            for interface in self._interfaces:
                arp_tmo = '300'
                path = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(path + option.split())

                # Options
                self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
                self.session.set(path + ['ip', 'disable-arp-filter'])
                self.session.set(path + ['ip', 'disable-forwarding'])
                self.session.set(path + ['ip', 'enable-arp-accept'])
                self.session.set(path + ['ip', 'enable-arp-announce'])
                self.session.set(path + ['ip', 'enable-arp-ignore'])
                self.session.set(path + ['ip', 'enable-proxy-arp'])
                self.session.set(path + ['ip', 'proxy-arp-pvlan'])
                self.session.set(path + ['ip', 'source-validation', 'loose'])

            self.session.commit()

            for interface in self._interfaces:
                tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
                self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_filter')
                self.assertEqual('0', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_accept')
                self.assertEqual('1', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_announce')
                self.assertEqual('1', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore')
                self.assertEqual('1', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/forwarding')
                self.assertEqual('0', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp')
                self.assertEqual('1', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan')
                self.assertEqual('1', tmp)

                tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter')
                self.assertEqual('2', tmp)

        def test_ipv6_options(self):
            """ Test interface base IPv6 options """
            if not self._test_ipv6:
                return None

            for interface in self._interfaces:
                dad_transmits = '10'
                path = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(path + option.split())

                # Options
                self.session.set(path + ['ipv6', 'disable-forwarding'])
                self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])

            self.session.commit()

            for interface in self._interfaces:
                tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding')
                self.assertEqual('0', tmp)

                tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/dad_transmits')
                self.assertEqual(dad_transmits, tmp)

        def test_ipv6_dhcpv6_duid(self):
            """ Test interface base IPv6 options """
            if not self._test_ipv6:
                return None

            for interface in self._interfaces:
                path = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(path + option.split())

                # Options
                duid = '0e:00:00:01:00:01:27:71:db:f0:00:50:56:bf:c5:6d'
                self.session.set(path + ['dhcpv6-options', 'duid', duid])

            self.session.commit()

            for interface in self._interfaces:
                with open('/var/lib/dhcpv6/dhcp6c_duid', 'rb') as f:
                    tmp = hexlify(f.read()).decode()

                self.assertEqual(duid.replace(':',''), tmp)

        def test_ipv6_dhcpv6_pd(self):
            """ Test interface base IPv6 options """
            if not self._test_ipv6:
                return None

            address = '1'
            sla_id = '0'
            sla_len = '8'
            for interface in self._interfaces:
                path = self._base_path + [interface]
                for option in self._options.get(interface, []):
                    self.session.set(path + option.split())

                # prefix delegation stuff
                pd_base = path + ['dhcpv6-options', 'pd', '0']
                self.session.set(pd_base + ['length', '56'])
                self.session.set(pd_base + ['interface', interface, 'address', address])
                self.session.set(pd_base + ['interface', interface, 'sla-id',  sla_id])

            self.session.commit()

            for interface in self._interfaces:
                # verify DHCPv6 prefix delegation
                # will return: ['delegation', '::/56 infinity;']
                tmp = get_dhcp6c_config_value(interface, 'prefix')[1].split()[0] # mind the whitespace
                self.assertEqual(tmp, '::/56')
                tmp = get_dhcp6c_config_value(interface, 'prefix-interface')[0].split()[0]
                self.assertEqual(tmp, interface)
                tmp = get_dhcp6c_config_value(interface, 'ifid')[0]
                self.assertEqual(tmp, address)
                tmp = get_dhcp6c_config_value(interface, 'sla-id')[0]
                self.assertEqual(tmp, sla_id)
                tmp = get_dhcp6c_config_value(interface, 'sla-len')[0]
                self.assertEqual(tmp, sla_len)

            # Check for running process
            self.assertTrue(process_named_running('dhcp6c'))