summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--interface-definitions/include/nat64/protocol.xml.i (renamed from interface-definitions/include/nat64-protocol.xml.i)2
-rw-r--r--interface-definitions/nat64.xml.in2
-rw-r--r--python/vyos/utils/file.py14
-rwxr-xr-xsmoketest/scripts/cli/test_nat64.py102
-rwxr-xr-xsrc/conf_mode/nat64.py25
5 files changed, 108 insertions, 37 deletions
diff --git a/interface-definitions/include/nat64-protocol.xml.i b/interface-definitions/include/nat64/protocol.xml.i
index 9432e6a87..a640873b5 100644
--- a/interface-definitions/include/nat64-protocol.xml.i
+++ b/interface-definitions/include/nat64/protocol.xml.i
@@ -1,4 +1,4 @@
-<!-- include start from nat64-protocol.xml.i -->
+<!-- include start from nat64/protocol.xml.i -->
<node name="protocol">
<properties>
<help>Apply translation address to a specfic protocol</help>
diff --git a/interface-definitions/nat64.xml.in b/interface-definitions/nat64.xml.in
index 1522395e4..baf13e6cb 100644
--- a/interface-definitions/nat64.xml.in
+++ b/interface-definitions/nat64.xml.in
@@ -66,7 +66,7 @@
#include <include/generic-description.xml.i>
#include <include/generic-disable-node.xml.i>
#include <include/nat-translation-port.xml.i>
- #include <include/nat64-protocol.xml.i>
+ #include <include/nat64/protocol.xml.i>
<leafNode name="address">
<properties>
<help>IPv4 address or prefix to translate to</help>
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index e20899fe7..667a2464b 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -83,20 +83,6 @@ def read_json(fname, defaultonfailure=None):
return defaultonfailure
raise e
-def write_json(fname, data, indent=2, defaultonfailure=None):
- """
- encode data to json and write to a file
- should defaultonfailure be not None, it is returned on failure to write
- """
- import json
- try:
- with open(fname, 'w') as f:
- json.dump(data, f, indent=indent)
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
def chown(path, user, group):
""" change file/directory owner """
from pwd import getpwnam
diff --git a/smoketest/scripts/cli/test_nat64.py b/smoketest/scripts/cli/test_nat64.py
new file mode 100755
index 000000000..b5723ac7e
--- /dev/null
+++ b/smoketest/scripts/cli/test_nat64.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import os
+import unittest
+
+from base_vyostest_shim import VyOSUnitTestSHIM
+from vyos.configsession import ConfigSessionError
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
+
+base_path = ['nat64']
+src_path = base_path + ['source']
+
+jool_nat64_config = '/run/jool/instance-100.json'
+
+
+class TestNAT64(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT64, cls).setUpClass()
+
+ # ensure we can also run this test on a live system - so lets clean
+ # out the current configuration :)
+ cls.cli_delete(cls, base_path)
+
+ def tearDown(self):
+ self.cli_delete(base_path)
+ self.cli_commit()
+ self.assertFalse(os.path.exists(jool_nat64_config))
+
+ def test_snat64(self):
+ rule = '100'
+ translation_rule = '10'
+ prefix_v6 = '64:ff9b::/96'
+ pool = '192.0.2.10'
+ pool_port = '1-65535'
+
+ self.cli_set(src_path + ['rule', rule, 'source', 'prefix', prefix_v6])
+ self.cli_set(
+ src_path
+ + ['rule', rule, 'translation', 'pool', translation_rule, 'address', pool]
+ )
+ self.cli_set(
+ src_path
+ + ['rule', rule, 'translation', 'pool', translation_rule, 'port', pool_port]
+ )
+ self.cli_commit()
+
+ # Load the JSON file
+ with open(f'/run/jool/instance-{rule}.json', 'r') as json_file:
+ config_data = json.load(json_file)
+
+ # Assertions based on the content of the JSON file
+ self.assertEqual(config_data['instance'], f'instance-{rule}')
+ self.assertEqual(config_data['framework'], 'netfilter')
+ self.assertEqual(config_data['global']['pool6'], prefix_v6)
+ self.assertTrue(config_data['global']['manually-enabled'])
+
+ # Check the pool4 entries
+ pool4_entries = config_data.get('pool4', [])
+ self.assertIsInstance(pool4_entries, list)
+ self.assertGreater(len(pool4_entries), 0)
+
+ for entry in pool4_entries:
+ self.assertIn('protocol', entry)
+ self.assertIn('prefix', entry)
+ self.assertIn('port range', entry)
+
+ protocol = entry['protocol']
+ prefix = entry['prefix']
+ port_range = entry['port range']
+
+ if protocol == 'ICMP':
+ self.assertEqual(prefix, pool)
+ self.assertEqual(port_range, pool_port)
+ elif protocol == 'UDP':
+ self.assertEqual(prefix, pool)
+ self.assertEqual(port_range, pool_port)
+ elif protocol == 'TCP':
+ self.assertEqual(prefix, pool)
+ self.assertEqual(port_range, pool_port)
+ else:
+ self.fail(f'Unexpected protocol: {protocol}')
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/conf_mode/nat64.py b/src/conf_mode/nat64.py
index d4df479ac..a8b90fb11 100755
--- a/src/conf_mode/nat64.py
+++ b/src/conf_mode/nat64.py
@@ -21,6 +21,7 @@ import os
import re
from ipaddress import IPv6Network
+from json import dumps as json_write
from vyos import ConfigError
from vyos import airbag
@@ -28,7 +29,7 @@ from vyos.config import Config
from vyos.configdict import dict_merge
from vyos.configdict import is_node_changed
from vyos.utils.dict import dict_search
-from vyos.utils.file import write_json
+from vyos.utils.file import write_file
from vyos.utils.kernel import check_kmod
from vyos.utils.process import cmd
from vyos.utils.process import run
@@ -40,27 +41,12 @@ JOOL_CONFIG_DIR = "/run/jool"
def get_config(config: Config | None = None) -> None:
- """ """
if config is None:
config = Config()
base = ["nat64"]
nat64 = config.get_config_dict(base, key_mangling=("-", "_"), get_first_key=True)
- # T2665: we must add the tagNode defaults individually until this is
- # moved to the base class
- for direction in ["source"]:
- if direction in nat64:
- default_values = defaults(base + [direction, "rule"])
- if "rule" in nat64[direction]:
- for rule in nat64[direction]["rule"]:
- nat64[direction]["rule"][rule] = dict_merge(
- default_values, nat64[direction]["rule"][rule]
- )
-
- # Only support netfilter for now
- nat64[direction]["rule"][rule]["mode"] = "netfilter"
-
base_src = base + ["source", "rule"]
# Load in existing instances so we can destroy any unknown
@@ -95,7 +81,6 @@ def get_config(config: Config | None = None) -> None:
def verify(nat64) -> None:
- """ """
if not nat64:
# no need to verify the CLI as nat64 is going to be deactivated
return
@@ -103,7 +88,7 @@ def verify(nat64) -> None:
if dict_search("source.rule", nat64):
# Ensure only 1 netfilter instance per namespace
nf_rules = filter(
- lambda i: "deleted" not in i and i["mode"] == "netfilter",
+ lambda i: "deleted" not in i and i.get('mode') == "netfilter",
nat64["source"]["rule"].values(),
)
next(nf_rules, None) # Discard the first element
@@ -138,7 +123,6 @@ def verify(nat64) -> None:
def generate(nat64) -> None:
- """ """
os.makedirs(JOOL_CONFIG_DIR, exist_ok=True)
if dict_search("source.rule", nat64):
@@ -183,11 +167,10 @@ def generate(nat64) -> None:
if pool4:
config["pool4"] = pool4
- write_json(f"{JOOL_CONFIG_DIR}/{name}.json", config)
+ write_file(f'{JOOL_CONFIG_DIR}/{name}.json', json_write(config, indent=2))
def apply(nat64) -> None:
- """ """
if not nat64:
return