diff options
| -rw-r--r-- | interface-definitions/include/nat64/protocol.xml.i (renamed from interface-definitions/include/nat64-protocol.xml.i) | 2 | ||||
| -rw-r--r-- | interface-definitions/nat64.xml.in | 2 | ||||
| -rw-r--r-- | python/vyos/utils/file.py | 14 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_nat64.py | 102 | ||||
| -rwxr-xr-x | src/conf_mode/nat64.py | 25 | 
5 files changed, 108 insertions, 37 deletions
| diff --git a/interface-definitions/include/nat64-protocol.xml.i b/interface-definitions/include/nat64/protocol.xml.i index 9432e6a87..a640873b5 100644 --- a/interface-definitions/include/nat64-protocol.xml.i +++ b/interface-definitions/include/nat64/protocol.xml.i @@ -1,4 +1,4 @@ -<!-- include start from nat64-protocol.xml.i --> +<!-- include start from nat64/protocol.xml.i -->  <node name="protocol">    <properties>      <help>Apply translation address to a specfic protocol</help> diff --git a/interface-definitions/nat64.xml.in b/interface-definitions/nat64.xml.in index 1522395e4..baf13e6cb 100644 --- a/interface-definitions/nat64.xml.in +++ b/interface-definitions/nat64.xml.in @@ -66,7 +66,7 @@                        #include <include/generic-description.xml.i>                        #include <include/generic-disable-node.xml.i>                        #include <include/nat-translation-port.xml.i> -                      #include <include/nat64-protocol.xml.i> +                      #include <include/nat64/protocol.xml.i>                        <leafNode name="address">                          <properties>                            <help>IPv4 address or prefix to translate to</help> diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 2b1f23b34..9f27a7fb9 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -83,20 +83,6 @@ def read_json(fname, defaultonfailure=None):              return defaultonfailure          raise e -def write_json(fname, data, indent=2, defaultonfailure=None): -    """ -    encode data to json and write to a file -    should defaultonfailure be not None, it is returned on failure to write -    """ -    import json -    try: -        with open(fname, 'w') as f: -            json.dump(data, f, indent=indent) -    except Exception as e: -        if defaultonfailure is not None: -            return defaultonfailure -        raise e -  def chown(path, user, group):      """ change file/directory owner """      from pwd import getpwnam diff --git a/smoketest/scripts/cli/test_nat64.py b/smoketest/scripts/cli/test_nat64.py new file mode 100755 index 000000000..b5723ac7e --- /dev/null +++ b/smoketest/scripts/cli/test_nat64.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2023 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. + +import json +import os +import unittest + +from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.configsession import ConfigSessionError +from vyos.utils.process import cmd +from vyos.utils.dict import dict_search + +base_path = ['nat64'] +src_path = base_path + ['source'] + +jool_nat64_config = '/run/jool/instance-100.json' + + +class TestNAT64(VyOSUnitTestSHIM.TestCase): +    @classmethod +    def setUpClass(cls): +        super(TestNAT64, cls).setUpClass() + +        # ensure we can also run this test on a live system - so lets clean +        # out the current configuration :) +        cls.cli_delete(cls, base_path) + +    def tearDown(self): +        self.cli_delete(base_path) +        self.cli_commit() +        self.assertFalse(os.path.exists(jool_nat64_config)) + +    def test_snat64(self): +        rule = '100' +        translation_rule = '10' +        prefix_v6 = '64:ff9b::/96' +        pool = '192.0.2.10' +        pool_port = '1-65535' + +        self.cli_set(src_path + ['rule', rule, 'source', 'prefix', prefix_v6]) +        self.cli_set( +            src_path +            + ['rule', rule, 'translation', 'pool', translation_rule, 'address', pool] +        ) +        self.cli_set( +            src_path +            + ['rule', rule, 'translation', 'pool', translation_rule, 'port', pool_port] +        ) +        self.cli_commit() + +        # Load the JSON file +        with open(f'/run/jool/instance-{rule}.json', 'r') as json_file: +            config_data = json.load(json_file) + +        # Assertions based on the content of the JSON file +        self.assertEqual(config_data['instance'], f'instance-{rule}') +        self.assertEqual(config_data['framework'], 'netfilter') +        self.assertEqual(config_data['global']['pool6'], prefix_v6) +        self.assertTrue(config_data['global']['manually-enabled']) + +        # Check the pool4 entries +        pool4_entries = config_data.get('pool4', []) +        self.assertIsInstance(pool4_entries, list) +        self.assertGreater(len(pool4_entries), 0) + +        for entry in pool4_entries: +            self.assertIn('protocol', entry) +            self.assertIn('prefix', entry) +            self.assertIn('port range', entry) + +            protocol = entry['protocol'] +            prefix = entry['prefix'] +            port_range = entry['port range'] + +            if protocol == 'ICMP': +                self.assertEqual(prefix, pool) +                self.assertEqual(port_range, pool_port) +            elif protocol == 'UDP': +                self.assertEqual(prefix, pool) +                self.assertEqual(port_range, pool_port) +            elif protocol == 'TCP': +                self.assertEqual(prefix, pool) +                self.assertEqual(port_range, pool_port) +            else: +                self.fail(f'Unexpected protocol: {protocol}') + + +if __name__ == '__main__': +    unittest.main(verbosity=2) diff --git a/src/conf_mode/nat64.py b/src/conf_mode/nat64.py index d4df479ac..a8b90fb11 100755 --- a/src/conf_mode/nat64.py +++ b/src/conf_mode/nat64.py @@ -21,6 +21,7 @@ import os  import re  from ipaddress import IPv6Network +from json import dumps as json_write  from vyos import ConfigError  from vyos import airbag @@ -28,7 +29,7 @@ from vyos.config import Config  from vyos.configdict import dict_merge  from vyos.configdict import is_node_changed  from vyos.utils.dict import dict_search -from vyos.utils.file import write_json +from vyos.utils.file import write_file  from vyos.utils.kernel import check_kmod  from vyos.utils.process import cmd  from vyos.utils.process import run @@ -40,27 +41,12 @@ JOOL_CONFIG_DIR = "/run/jool"  def get_config(config: Config | None = None) -> None: -    """ """      if config is None:          config = Config()      base = ["nat64"]      nat64 = config.get_config_dict(base, key_mangling=("-", "_"), get_first_key=True) -    # T2665: we must add the tagNode defaults individually until this is -    # moved to the base class -    for direction in ["source"]: -        if direction in nat64: -            default_values = defaults(base + [direction, "rule"]) -            if "rule" in nat64[direction]: -                for rule in nat64[direction]["rule"]: -                    nat64[direction]["rule"][rule] = dict_merge( -                        default_values, nat64[direction]["rule"][rule] -                    ) - -                    # Only support netfilter for now -                    nat64[direction]["rule"][rule]["mode"] = "netfilter" -      base_src = base + ["source", "rule"]      # Load in existing instances so we can destroy any unknown @@ -95,7 +81,6 @@ def get_config(config: Config | None = None) -> None:  def verify(nat64) -> None: -    """ """      if not nat64:          # no need to verify the CLI as nat64 is going to be deactivated          return @@ -103,7 +88,7 @@ def verify(nat64) -> None:      if dict_search("source.rule", nat64):          # Ensure only 1 netfilter instance per namespace          nf_rules = filter( -            lambda i: "deleted" not in i and i["mode"] == "netfilter", +            lambda i: "deleted" not in i and i.get('mode') == "netfilter",              nat64["source"]["rule"].values(),          )          next(nf_rules, None)  # Discard the first element @@ -138,7 +123,6 @@ def verify(nat64) -> None:  def generate(nat64) -> None: -    """ """      os.makedirs(JOOL_CONFIG_DIR, exist_ok=True)      if dict_search("source.rule", nat64): @@ -183,11 +167,10 @@ def generate(nat64) -> None:                  if pool4:                      config["pool4"] = pool4 -            write_json(f"{JOOL_CONFIG_DIR}/{name}.json", config) +            write_file(f'{JOOL_CONFIG_DIR}/{name}.json', json_write(config, indent=2))  def apply(nat64) -> None: -    """ """      if not nat64:          return | 
