#!/usr/bin/env python3
#
# Copyright (C) 2020 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import jmespath
import json
import unittest

from base_vyostest_shim import VyOSUnitTestSHIM

from vyos.configsession import ConfigSessionError
from vyos.util import cmd
from vyos.util import dict_search

base_path = ['nat66']
src_path = base_path + ['source']
dst_path = base_path + ['destination']

class TestNAT66(VyOSUnitTestSHIM.TestCase):
    @classmethod
    def setUpClass(cls):
        super(TestNAT66, cls).setUpClass()

        # ensure we can also run this test on a live system - so lets clean
        # out the current configuration :)
        cls.cli_delete(cls, base_path)

    def tearDown(self):
        self.cli_delete(base_path)
        self.cli_commit()

    def verify_nftables(self, nftables_search, table, inverse=False, args=''):
        nftables_output = cmd(f'sudo nft {args} list table {table}')

        for search in nftables_search:
            matched = False
            for line in nftables_output.split("\n"):
                if all(item in line for item in search):
                    matched = True
                    break
            self.assertTrue(not matched if inverse else matched, msg=search)

    def test_source_nat66(self):
        source_prefix = 'fc00::/64'
        translation_prefix = 'fc01::/64'
        self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'eth1'])
        self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix])
        self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_prefix])

        self.cli_set(src_path + ['rule', '2', 'outbound-interface', 'eth1'])
        self.cli_set(src_path + ['rule', '2', 'source', 'prefix', source_prefix])
        self.cli_set(src_path + ['rule', '2', 'translation', 'address', 'masquerade'])

        self.cli_set(src_path + ['rule', '3', 'outbound-interface', 'eth1'])
        self.cli_set(src_path + ['rule', '3', 'source', 'prefix', source_prefix])
        self.cli_set(src_path + ['rule', '3', 'exclude'])

        self.cli_commit()

        nftables_search = [
            ['oifname "eth1"', 'ip6 saddr fc00::/64', 'snat prefix to fc01::/64'],
            ['oifname "eth1"', 'ip6 saddr fc00::/64', 'masquerade'],
            ['oifname "eth1"', 'ip6 saddr fc00::/64', 'return']
        ]

        self.verify_nftables(nftables_search, 'ip6 nat')

    def test_source_nat66_address(self):
        source_prefix = 'fc00::/64'
        translation_address = 'fc00::1'
        self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'eth1'])
        self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix])
        self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address])

        # check validate() - outbound-interface must be defined
        self.cli_commit()

        tmp = cmd('sudo nft -j list table ip6 nat')
        data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))

        for idx in range(0, len(data_json)):
            data = data_json[idx]

            self.assertEqual(data['chain'], 'POSTROUTING')
            self.assertEqual(data['family'], 'ip6')
            self.assertEqual(data['table'], 'nat')

            iface = dict_search('match.right', data['expr'][0])
            address = dict_search('match.right.prefix.addr', data['expr'][2])
            mask = dict_search('match.right.prefix.len', data['expr'][2])
            snat_address = dict_search('snat.addr', data['expr'][3])

            self.assertEqual(iface, 'eth1')
            # check for translation address
            self.assertEqual(snat_address, translation_address)
            self.assertEqual(f'{address}/{mask}', source_prefix)

    def test_destination_nat66(self):
        destination_address = 'fc00::1'
        translation_address = 'fc01::1'
        source_address = 'fc02::1'
        self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
        self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_address])
        self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address])

        self.cli_set(dst_path + ['rule', '2', 'inbound-interface', 'eth1'])
        self.cli_set(dst_path + ['rule', '2', 'destination', 'address', destination_address])
        self.cli_set(dst_path + ['rule', '2', 'source', 'address', source_address])
        self.cli_set(dst_path + ['rule', '2', 'exclude'])

        # check validate() - outbound-interface must be defined
        self.cli_commit()

        nftables_search = [
            ['iifname "eth1"', 'ip6 daddr fc00::1', 'dnat to fc01::1'],
            ['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return']
        ]

        self.verify_nftables(nftables_search, 'ip6 nat')

    def test_destination_nat66_prefix(self):
        destination_prefix = 'fc00::/64'
        translation_prefix = 'fc01::/64'
        self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
        self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_prefix])
        self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_prefix])

        # check validate() - outbound-interface must be defined
        self.cli_commit()

        tmp = cmd('sudo nft -j list table ip6 nat')
        data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))

        for idx in range(0, len(data_json)):
            data = data_json[idx]

            self.assertEqual(data['chain'], 'PREROUTING')
            self.assertEqual(data['family'], 'ip6')
            self.assertEqual(data['table'], 'nat')

            iface = dict_search('match.right', data['expr'][0])
            translation_address = dict_search('dnat.addr.prefix.addr', data['expr'][3])
            translation_mask = dict_search('dnat.addr.prefix.len', data['expr'][3])

            self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix)
            self.assertEqual(iface, 'eth1')

    def test_source_nat66_required_translation_prefix(self):
        # T2813: Ensure translation address is specified
        rule = '5'
        source_prefix = 'fc00::/64'
        self.cli_set(src_path + ['rule', rule, 'source', 'prefix', source_prefix])

        # check validate() - outbound-interface must be defined
        with self.assertRaises(ConfigSessionError):
            self.cli_commit()
        self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])

        # check validate() - translation address not specified
        with self.assertRaises(ConfigSessionError):
            self.cli_commit()

        self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
        self.cli_commit()

    def test_nat66_no_rules(self):
        # T3206: deleting all rules but keep the direction 'destination' or
        # 'source' resulteds in KeyError: 'rule'.
        #
        # Test that both 'nat destination' and 'nat source' nodes can exist
        # without any rule
        self.cli_set(src_path)
        self.cli_set(dst_path)
        self.cli_commit()

if __name__ == '__main__':
    unittest.main(verbosity=2)