# This file is part of cloud-init. See LICENSE file for license information.

import os
import pwd
import unittest

import pytest

from cloudinit import helpers, util
from cloudinit.sources import DataSourceOpenNebula as ds
from tests.unittests.helpers import CiTestCase, mock, populate_dir

TEST_VARS = {
    "VAR1": "single",
    "VAR2": "double word",
    "VAR3": "multi\nline\n",
    "VAR4": "'single'",
    "VAR5": "'double word'",
    "VAR6": "'multi\nline\n'",
    "VAR7": "single\\t",
    "VAR8": "double\\tword",
    "VAR9": "multi\\t\nline\n",
    "VAR10": "\\",  # expect '\'
    "VAR11": "'",  # expect '
    "VAR12": "$",  # expect $
}

INVALID_CONTEXT = ";"
USER_DATA = "#cloud-config\napt_upgrade: true"
SSH_KEY = "ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i"
HOSTNAME = "foo.example.com"
PUBLIC_IP = "10.0.0.3"
MACADDR = "02:00:0a:12:01:01"
IP_BY_MACADDR = "10.18.1.1"
IP4_PREFIX = "24"
IP6_GLOBAL = "2001:db8:1:0:400:c0ff:fea8:1ba"
IP6_ULA = "fd01:dead:beaf:0:400:c0ff:fea8:1ba"
IP6_GW = "2001:db8:1::ffff"
IP6_PREFIX = "48"

DS_PATH = "cloudinit.sources.DataSourceOpenNebula"


class TestOpenNebulaDataSource(CiTestCase):
    parsed_user = None
    allowed_subp = ["bash"]

    def setUp(self):
        super(TestOpenNebulaDataSource, self).setUp()
        self.tmp = self.tmp_dir()
        self.paths = helpers.Paths(
            {"cloud_dir": self.tmp, "run_dir": self.tmp}
        )

        # defaults for few tests
        self.ds = ds.DataSourceOpenNebula
        self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
        self.sys_cfg = {"datasource": {"OpenNebula": {"dsmode": "local"}}}

        # we don't want 'sudo' called in tests. so we patch switch_user_cmd
        def my_switch_user_cmd(user):
            self.parsed_user = user
            return []

        self.switch_user_cmd_real = ds.switch_user_cmd
        ds.switch_user_cmd = my_switch_user_cmd

    def tearDown(self):
        ds.switch_user_cmd = self.switch_user_cmd_real
        super(TestOpenNebulaDataSource, self).tearDown()

    def test_get_data_non_contextdisk(self):
        orig_find_devs_with = util.find_devs_with
        try:
            # dont' try to lookup for CDs
            util.find_devs_with = lambda n: []
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
            ret = dsrc.get_data()
            self.assertFalse(ret)
        finally:
            util.find_devs_with = orig_find_devs_with

    def test_get_data_broken_contextdisk(self):
        orig_find_devs_with = util.find_devs_with
        try:
            # dont' try to lookup for CDs
            util.find_devs_with = lambda n: []
            populate_dir(self.seed_dir, {"context.sh": INVALID_CONTEXT})
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
            self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
        finally:
            util.find_devs_with = orig_find_devs_with

    def test_get_data_invalid_identity(self):
        orig_find_devs_with = util.find_devs_with
        try:
            # generate non-existing system user name
            sys_cfg = self.sys_cfg
            invalid_user = "invalid"
            while not sys_cfg["datasource"]["OpenNebula"].get("parseuser"):
                try:
                    pwd.getpwnam(invalid_user)
                    invalid_user += "X"
                except KeyError:
                    sys_cfg["datasource"]["OpenNebula"][
                        "parseuser"
                    ] = invalid_user

            # dont' try to lookup for CDs
            util.find_devs_with = lambda n: []
            populate_context_dir(self.seed_dir, {"KEY1": "val1"})
            dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
            self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
        finally:
            util.find_devs_with = orig_find_devs_with

    def test_get_data(self):
        orig_find_devs_with = util.find_devs_with
        try:
            # dont' try to lookup for CDs
            util.find_devs_with = lambda n: []
            populate_context_dir(self.seed_dir, {"KEY1": "val1"})
            dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
            ret = dsrc.get_data()
            self.assertTrue(ret)
        finally:
            util.find_devs_with = orig_find_devs_with
        self.assertEqual("opennebula", dsrc.cloud_name)
        self.assertEqual("opennebula", dsrc.platform_type)
        self.assertEqual(
            "seed-dir (%s/seed/opennebula)" % self.tmp, dsrc.subplatform
        )

    def test_seed_dir_non_contextdisk(self):
        self.assertRaises(
            ds.NonContextDiskDir,
            ds.read_context_disk_dir,
            self.seed_dir,
            mock.Mock(),
        )

    def test_seed_dir_empty1_context(self):
        populate_dir(self.seed_dir, {"context.sh": ""})
        results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

        self.assertIsNone(results["userdata"])
        self.assertEqual(results["metadata"], {})

    def test_seed_dir_empty2_context(self):
        populate_context_dir(self.seed_dir, {})
        results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

        self.assertIsNone(results["userdata"])
        self.assertEqual(results["metadata"], {})

    def test_seed_dir_broken_context(self):
        populate_dir(self.seed_dir, {"context.sh": INVALID_CONTEXT})

        self.assertRaises(
            ds.BrokenContextDiskDir,
            ds.read_context_disk_dir,
            self.seed_dir,
            mock.Mock(),
        )

    def test_context_parser(self):
        populate_context_dir(self.seed_dir, TEST_VARS)
        results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

        self.assertTrue("metadata" in results)
        self.assertEqual(TEST_VARS, results["metadata"])

    def test_ssh_key(self):
        public_keys = ["first key", "second key"]
        for c in range(4):
            for k in ("SSH_KEY", "SSH_PUBLIC_KEY"):
                my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
                populate_context_dir(my_d, {k: "\n".join(public_keys)})
                results = ds.read_context_disk_dir(my_d, mock.Mock())

                self.assertTrue("metadata" in results)
                self.assertTrue("public-keys" in results["metadata"])
                self.assertEqual(
                    public_keys, results["metadata"]["public-keys"]
                )

            public_keys.append(SSH_KEY % (c + 1,))

    def test_user_data_plain(self):
        for k in ("USER_DATA", "USERDATA"):
            my_d = os.path.join(self.tmp, k)
            populate_context_dir(my_d, {k: USER_DATA, "USERDATA_ENCODING": ""})
            results = ds.read_context_disk_dir(my_d, mock.Mock())

            self.assertTrue("userdata" in results)
            self.assertEqual(USER_DATA, results["userdata"])

    def test_user_data_encoding_required_for_decode(self):
        b64userdata = util.b64e(USER_DATA)
        for k in ("USER_DATA", "USERDATA"):
            my_d = os.path.join(self.tmp, k)
            populate_context_dir(my_d, {k: b64userdata})
            results = ds.read_context_disk_dir(my_d, mock.Mock())

            self.assertTrue("userdata" in results)
            self.assertEqual(b64userdata, results["userdata"])

    def test_user_data_base64_encoding(self):
        for k in ("USER_DATA", "USERDATA"):
            my_d = os.path.join(self.tmp, k)
            populate_context_dir(
                my_d, {k: util.b64e(USER_DATA), "USERDATA_ENCODING": "base64"}
            )
            results = ds.read_context_disk_dir(my_d, mock.Mock())

            self.assertTrue("userdata" in results)
            self.assertEqual(USER_DATA, results["userdata"])

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_hostname(self, m_get_phys_by_mac):
        for dev in ("eth0", "ens3"):
            m_get_phys_by_mac.return_value = {MACADDR: dev}
            for k in (
                "SET_HOSTNAME",
                "HOSTNAME",
                "PUBLIC_IP",
                "IP_PUBLIC",
                "ETH0_IP",
            ):
                my_d = os.path.join(self.tmp, k)
                populate_context_dir(my_d, {k: PUBLIC_IP})
                results = ds.read_context_disk_dir(my_d, mock.Mock())

                self.assertTrue("metadata" in results)
                self.assertTrue("local-hostname" in results["metadata"])
                self.assertEqual(
                    PUBLIC_IP, results["metadata"]["local-hostname"]
                )

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_network_interfaces(self, m_get_phys_by_mac):
        for dev in ("eth0", "ens3"):
            m_get_phys_by_mac.return_value = {MACADDR: dev}

            # without ETH0_MAC
            # for Older OpenNebula?
            populate_context_dir(self.seed_dir, {"ETH0_IP": IP_BY_MACADDR})
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP_BY_MACADDR + "/" + IP4_PREFIX
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP and ETH0_MAC
            populate_context_dir(
                self.seed_dir, {"ETH0_IP": IP_BY_MACADDR, "ETH0_MAC": MACADDR}
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP_BY_MACADDR + "/" + IP4_PREFIX
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP with empty string and ETH0_MAC
            # in the case of using Virtual Network contains
            # "AR = [ TYPE = ETHER ]"
            populate_context_dir(
                self.seed_dir, {"ETH0_IP": "", "ETH0_MAC": MACADDR}
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP_BY_MACADDR + "/" + IP4_PREFIX
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_MASK
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP": IP_BY_MACADDR,
                    "ETH0_MAC": MACADDR,
                    "ETH0_MASK": "255.255.0.0",
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP_BY_MACADDR + "/16"
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_MASK with empty string
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP": IP_BY_MACADDR,
                    "ETH0_MAC": MACADDR,
                    "ETH0_MASK": "",
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP_BY_MACADDR + "/" + IP4_PREFIX
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP6
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP6": IP6_GLOBAL,
                    "ETH0_MAC": MACADDR,
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP6_GLOBAL + "/64"
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP6_ULA
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP6_ULA": IP6_ULA,
                    "ETH0_MAC": MACADDR,
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP6_ULA + "/64"
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP6 and ETH0_IP6_PREFIX_LENGTH
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP6": IP6_GLOBAL,
                    "ETH0_IP6_PREFIX_LENGTH": IP6_PREFIX,
                    "ETH0_MAC": MACADDR,
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP6_GLOBAL + "/" + IP6_PREFIX
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

            # ETH0_IP6 and ETH0_IP6_PREFIX_LENGTH with empty string
            populate_context_dir(
                self.seed_dir,
                {
                    "ETH0_IP6": IP6_GLOBAL,
                    "ETH0_IP6_PREFIX_LENGTH": "",
                    "ETH0_MAC": MACADDR,
                },
            )
            results = ds.read_context_disk_dir(self.seed_dir, mock.Mock())

            self.assertTrue("network-interfaces" in results)
            self.assertTrue(
                IP6_GLOBAL + "/64"
                in results["network-interfaces"]["ethernets"][dev]["addresses"]
            )

    def test_find_candidates(self):
        def my_devs_with(criteria):
            return {
                "LABEL=CONTEXT": ["/dev/sdb"],
                "LABEL=CDROM": ["/dev/sr0"],
                "TYPE=iso9660": ["/dev/vdb"],
            }.get(criteria, [])

        orig_find_devs_with = util.find_devs_with
        try:
            util.find_devs_with = my_devs_with
            self.assertEqual(
                ["/dev/sdb", "/dev/sr0", "/dev/vdb"], ds.find_candidate_devs()
            )
        finally:
            util.find_devs_with = orig_find_devs_with


@mock.patch(DS_PATH + ".net.get_interfaces_by_mac", mock.Mock(return_value={}))
class TestOpenNebulaNetwork(unittest.TestCase):

    system_nics = ("eth0", "ens3")

    def test_context_devname(self):
        """Verify context_devname correctly returns mac and name."""
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH1_MAC": "02:00:0a:12:0f:0f",
        }
        expected = {
            "02:00:0a:12:01:01": "ETH0",
            "02:00:0a:12:0f:0f": "ETH1",
        }
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        self.assertEqual(expected, net.context_devname)

    def test_get_nameservers(self):
        """
        Verify get_nameservers('device') correctly returns DNS server addresses
        and search domains.
        """
        context = {
            "DNS": "1.2.3.8",
            "ETH0_DNS": "1.2.3.6 1.2.3.7",
            "ETH0_SEARCH_DOMAIN": "example.com example.org",
        }
        expected = {
            "addresses": ["1.2.3.6", "1.2.3.7", "1.2.3.8"],
            "search": ["example.com", "example.org"],
        }
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_nameservers("eth0")
        self.assertEqual(expected, val)

    def test_get_mtu(self):
        """Verify get_mtu('device') correctly returns MTU size."""
        context = {"ETH0_MTU": "1280"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_mtu("eth0")
        self.assertEqual("1280", val)

    def test_get_ip(self):
        """Verify get_ip('device') correctly returns IPv4 address."""
        context = {"ETH0_IP": PUBLIC_IP}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip("eth0", MACADDR)
        self.assertEqual(PUBLIC_IP, val)

    def test_get_ip_emptystring(self):
        """
        Verify get_ip('device') correctly returns IPv4 address.
        It returns IP address created by MAC address if ETH0_IP has empty
        string.
        """
        context = {"ETH0_IP": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip("eth0", MACADDR)
        self.assertEqual(IP_BY_MACADDR, val)

    def test_get_ip6(self):
        """
        Verify get_ip6('device') correctly returns IPv6 address.
        In this case, IPv6 address is Given by ETH0_IP6.
        """
        context = {
            "ETH0_IP6": IP6_GLOBAL,
            "ETH0_IP6_ULA": "",
        }
        expected = [IP6_GLOBAL]
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip6("eth0")
        self.assertEqual(expected, val)

    def test_get_ip6_ula(self):
        """
        Verify get_ip6('device') correctly returns IPv6 address.
        In this case, IPv6 address is Given by ETH0_IP6_ULA.
        """
        context = {
            "ETH0_IP6": "",
            "ETH0_IP6_ULA": IP6_ULA,
        }
        expected = [IP6_ULA]
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip6("eth0")
        self.assertEqual(expected, val)

    def test_get_ip6_dual(self):
        """
        Verify get_ip6('device') correctly returns IPv6 address.
        In this case, IPv6 addresses are Given by ETH0_IP6 and ETH0_IP6_ULA.
        """
        context = {
            "ETH0_IP6": IP6_GLOBAL,
            "ETH0_IP6_ULA": IP6_ULA,
        }
        expected = [IP6_GLOBAL, IP6_ULA]
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip6("eth0")
        self.assertEqual(expected, val)

    def test_get_ip6_prefix(self):
        """
        Verify get_ip6_prefix('device') correctly returns IPv6 prefix.
        """
        context = {"ETH0_IP6_PREFIX_LENGTH": IP6_PREFIX}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip6_prefix("eth0")
        self.assertEqual(IP6_PREFIX, val)

    def test_get_ip6_prefix_emptystring(self):
        """
        Verify get_ip6_prefix('device') correctly returns IPv6 prefix.
        It returns default value '64' if ETH0_IP6_PREFIX_LENGTH has empty
        string.
        """
        context = {"ETH0_IP6_PREFIX_LENGTH": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_ip6_prefix("eth0")
        self.assertEqual("64", val)

    def test_get_gateway(self):
        """
        Verify get_gateway('device') correctly returns IPv4 default gateway
        address.
        """
        context = {"ETH0_GATEWAY": "1.2.3.5"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_gateway("eth0")
        self.assertEqual("1.2.3.5", val)

    def test_get_gateway6(self):
        """
        Verify get_gateway6('device') correctly returns IPv6 default gateway
        address.
        """
        for k in ("GATEWAY6", "IP6_GATEWAY"):
            context = {"ETH0_" + k: IP6_GW}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            val = net.get_gateway6("eth0")
            self.assertEqual(IP6_GW, val)

    def test_get_mask(self):
        """
        Verify get_mask('device') correctly returns IPv4 subnet mask.
        """
        context = {"ETH0_MASK": "255.255.0.0"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_mask("eth0")
        self.assertEqual("255.255.0.0", val)

    def test_get_mask_emptystring(self):
        """
        Verify get_mask('device') correctly returns IPv4 subnet mask.
        It returns default value '255.255.255.0' if ETH0_MASK has empty string.
        """
        context = {"ETH0_MASK": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_mask("eth0")
        self.assertEqual("255.255.255.0", val)

    def test_get_network(self):
        """
        Verify get_network('device') correctly returns IPv4 network address.
        """
        context = {"ETH0_NETWORK": "1.2.3.0"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_network("eth0", MACADDR)
        self.assertEqual("1.2.3.0", val)

    def test_get_network_emptystring(self):
        """
        Verify get_network('device') correctly returns IPv4 network address.
        It returns network address created by MAC address if ETH0_NETWORK has
        empty string.
        """
        context = {"ETH0_NETWORK": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_network("eth0", MACADDR)
        self.assertEqual("10.18.1.0", val)

    def test_get_field(self):
        """
        Verify get_field('device', 'name') returns *context* value.
        """
        context = {"ETH9_DUMMY": "DUMMY_VALUE"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_field("eth9", "dummy")
        self.assertEqual("DUMMY_VALUE", val)

    def test_get_field_withdefaultvalue(self):
        """
        Verify get_field('device', 'name', 'default value') returns *context*
        value.
        """
        context = {"ETH9_DUMMY": "DUMMY_VALUE"}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_field("eth9", "dummy", "DEFAULT_VALUE")
        self.assertEqual("DUMMY_VALUE", val)

    def test_get_field_withdefaultvalue_emptycontext(self):
        """
        Verify get_field('device', 'name', 'default value') returns *default*
        value if context value is empty string.
        """
        context = {"ETH9_DUMMY": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_field("eth9", "dummy", "DEFAULT_VALUE")
        self.assertEqual("DEFAULT_VALUE", val)

    def test_get_field_emptycontext(self):
        """
        Verify get_field('device', 'name') returns None if context value is
        empty string.
        """
        context = {"ETH9_DUMMY": ""}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_field("eth9", "dummy")
        self.assertEqual(None, val)

    def test_get_field_nonecontext(self):
        """
        Verify get_field('device', 'name') returns None if context value is
        None.
        """
        context = {"ETH9_DUMMY": None}
        net = ds.OpenNebulaNetwork(context, mock.Mock())
        val = net.get_field("eth9", "dummy")
        self.assertEqual(None, val)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_gen_conf_gateway(self, m_get_phys_by_mac):
        """Test rendering with/without IPv4 gateway"""
        self.maxDiff = None
        # empty ETH0_GATEWAY
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_GATEWAY": "",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

        # set ETH0_GATEWAY
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_GATEWAY": "1.2.3.5",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "gateway4": "1.2.3.5",
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_gen_conf_gateway6(self, m_get_phys_by_mac):
        """Test rendering with/without IPv6 gateway"""
        self.maxDiff = None
        # empty ETH0_GATEWAY6
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_GATEWAY6": "",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

        # set ETH0_GATEWAY6
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_GATEWAY6": IP6_GW,
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "gateway6": IP6_GW,
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_gen_conf_ipv6address(self, m_get_phys_by_mac):
        """Test rendering with/without IPv6 address"""
        self.maxDiff = None
        # empty ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_IP6": "",
            "ETH0_IP6_ULA": "",
            "ETH0_IP6_PREFIX_LENGTH": "",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

        # set ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_IP6": IP6_GLOBAL,
            "ETH0_IP6_PREFIX_LENGTH": IP6_PREFIX,
            "ETH0_IP6_ULA": IP6_ULA,
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [
                            IP_BY_MACADDR + "/" + IP4_PREFIX,
                            IP6_GLOBAL + "/" + IP6_PREFIX,
                            IP6_ULA + "/" + IP6_PREFIX,
                        ],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_gen_conf_dns(self, m_get_phys_by_mac):
        """Test rendering with/without DNS server, search domain"""
        self.maxDiff = None
        # empty DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "DNS": "",
            "ETH0_DNS": "",
            "ETH0_SEARCH_DOMAIN": "",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

        # set DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "DNS": "1.2.3.8",
            "ETH0_DNS": "1.2.3.6 1.2.3.7",
            "ETH0_SEARCH_DOMAIN": "example.com example.org",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "nameservers": {
                            "addresses": ["1.2.3.6", "1.2.3.7", "1.2.3.8"],
                            "search": ["example.com", "example.org"],
                        },
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_gen_conf_mtu(self, m_get_phys_by_mac):
        """Test rendering with/without MTU"""
        self.maxDiff = None
        # empty ETH0_MTU
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_MTU": "",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

        # set ETH0_MTU
        context = {
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_MTU": "1280",
        }
        for nic in self.system_nics:
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "mtu": "1280",
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork(context, mock.Mock())
            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_eth0(self, m_get_phys_by_mac):
        for nic in self.system_nics:
            m_get_phys_by_mac.return_value = {MACADDR: nic}
            net = ds.OpenNebulaNetwork({}, mock.Mock())
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/" + IP4_PREFIX],
                    }
                },
            }

            self.assertEqual(net.gen_conf(), expected)

    @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
    def test_distro_passed_through(self, m_get_physical_nics_by_mac):
        ds.OpenNebulaNetwork({}, mock.sentinel.distro)
        self.assertEqual(
            [mock.call(mock.sentinel.distro)],
            m_get_physical_nics_by_mac.call_args_list,
        )

    def test_eth0_override(self):
        self.maxDiff = None
        context = {
            "DNS": "1.2.3.8",
            "ETH0_DNS": "1.2.3.6 1.2.3.7",
            "ETH0_GATEWAY": "1.2.3.5",
            "ETH0_GATEWAY6": "",
            "ETH0_IP": IP_BY_MACADDR,
            "ETH0_IP6": "",
            "ETH0_IP6_PREFIX_LENGTH": "",
            "ETH0_IP6_ULA": "",
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_MASK": "255.255.0.0",
            "ETH0_MTU": "",
            "ETH0_NETWORK": "10.18.0.0",
            "ETH0_SEARCH_DOMAIN": "",
        }
        for nic in self.system_nics:
            net = ds.OpenNebulaNetwork(
                context, mock.Mock(), system_nics_by_mac={MACADDR: nic}
            )
            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [IP_BY_MACADDR + "/16"],
                        "gateway4": "1.2.3.5",
                        "nameservers": {
                            "addresses": ["1.2.3.6", "1.2.3.7", "1.2.3.8"]
                        },
                    }
                },
            }

            self.assertEqual(expected, net.gen_conf())

    def test_eth0_v4v6_override(self):
        self.maxDiff = None
        context = {
            "DNS": "1.2.3.8",
            "ETH0_DNS": "1.2.3.6 1.2.3.7",
            "ETH0_GATEWAY": "1.2.3.5",
            "ETH0_GATEWAY6": IP6_GW,
            "ETH0_IP": IP_BY_MACADDR,
            "ETH0_IP6": IP6_GLOBAL,
            "ETH0_IP6_PREFIX_LENGTH": IP6_PREFIX,
            "ETH0_IP6_ULA": IP6_ULA,
            "ETH0_MAC": "02:00:0a:12:01:01",
            "ETH0_MASK": "255.255.0.0",
            "ETH0_MTU": "1280",
            "ETH0_NETWORK": "10.18.0.0",
            "ETH0_SEARCH_DOMAIN": "example.com example.org",
        }
        for nic in self.system_nics:
            net = ds.OpenNebulaNetwork(
                context, mock.Mock(), system_nics_by_mac={MACADDR: nic}
            )

            expected = {
                "version": 2,
                "ethernets": {
                    nic: {
                        "match": {"macaddress": MACADDR},
                        "addresses": [
                            IP_BY_MACADDR + "/16",
                            IP6_GLOBAL + "/" + IP6_PREFIX,
                            IP6_ULA + "/" + IP6_PREFIX,
                        ],
                        "gateway4": "1.2.3.5",
                        "gateway6": IP6_GW,
                        "nameservers": {
                            "addresses": ["1.2.3.6", "1.2.3.7", "1.2.3.8"],
                            "search": ["example.com", "example.org"],
                        },
                        "mtu": "1280",
                    }
                },
            }

            self.assertEqual(expected, net.gen_conf())

    def test_multiple_nics(self):
        """Test rendering multiple nics with names that differ from context."""
        self.maxDiff = None
        MAC_1 = "02:00:0a:12:01:01"
        MAC_2 = "02:00:0a:12:01:02"
        context = {
            "DNS": "1.2.3.8",
            "ETH0_DNS": "1.2.3.6 1.2.3.7",
            "ETH0_GATEWAY": "1.2.3.5",
            "ETH0_GATEWAY6": IP6_GW,
            "ETH0_IP": "10.18.1.1",
            "ETH0_IP6": IP6_GLOBAL,
            "ETH0_IP6_PREFIX_LENGTH": "",
            "ETH0_IP6_ULA": IP6_ULA,
            "ETH0_MAC": MAC_2,
            "ETH0_MASK": "255.255.0.0",
            "ETH0_MTU": "1280",
            "ETH0_NETWORK": "10.18.0.0",
            "ETH0_SEARCH_DOMAIN": "example.com",
            "ETH3_DNS": "10.3.1.2",
            "ETH3_GATEWAY": "10.3.0.1",
            "ETH3_GATEWAY6": "",
            "ETH3_IP": "10.3.1.3",
            "ETH3_IP6": "",
            "ETH3_IP6_PREFIX_LENGTH": "",
            "ETH3_IP6_ULA": "",
            "ETH3_MAC": MAC_1,
            "ETH3_MASK": "255.255.0.0",
            "ETH3_MTU": "",
            "ETH3_NETWORK": "10.3.0.0",
            "ETH3_SEARCH_DOMAIN": "third.example.com third.example.org",
        }
        net = ds.OpenNebulaNetwork(
            context,
            mock.Mock(),
            system_nics_by_mac={MAC_1: "enp0s25", MAC_2: "enp1s2"},
        )

        expected = {
            "version": 2,
            "ethernets": {
                "enp1s2": {
                    "match": {"macaddress": MAC_2},
                    "addresses": [
                        "10.18.1.1/16",
                        IP6_GLOBAL + "/64",
                        IP6_ULA + "/64",
                    ],
                    "gateway4": "1.2.3.5",
                    "gateway6": IP6_GW,
                    "nameservers": {
                        "addresses": ["1.2.3.6", "1.2.3.7", "1.2.3.8"],
                        "search": ["example.com"],
                    },
                    "mtu": "1280",
                },
                "enp0s25": {
                    "match": {"macaddress": MAC_1},
                    "addresses": ["10.3.1.3/16"],
                    "gateway4": "10.3.0.1",
                    "nameservers": {
                        "addresses": ["10.3.1.2", "1.2.3.8"],
                        "search": ["third.example.com", "third.example.org"],
                    },
                },
            },
        }

        self.assertEqual(expected, net.gen_conf())


class TestParseShellConfig:
    @pytest.mark.allow_subp_for("bash")
    def test_no_seconds(self):
        cfg = "\n".join(["foo=bar", "SECONDS=2", "xx=foo"])
        # we could test 'sleep 2', but that would make the test run slower.
        ret = ds.parse_shell_config(cfg)
        assert ret == {"foo": "bar", "xx": "foo"}


class TestGetPhysicalNicsByMac:
    @pytest.mark.parametrize(
        "interfaces_by_mac,physical_devs,expected_return",
        [
            # No interfaces => empty return
            ({}, [], {}),
            # Only virtual interface => empty return
            ({"mac1": "virtual0"}, [], {}),
            # Only physical interface => it is returned
            ({"mac2": "physical0"}, ["physical0"], {"mac2": "physical0"}),
            # Combination of physical and virtual => only physical returned
            (
                {"mac3": "physical1", "mac4": "virtual1"},
                ["physical1"],
                {"mac3": "physical1"},
            ),
        ],
    )
    def test(self, interfaces_by_mac, physical_devs, expected_return):
        distro = mock.Mock()
        distro.networking.is_physical.side_effect = (
            lambda devname: devname in physical_devs
        )
        with mock.patch(
            DS_PATH + ".net.get_interfaces_by_mac",
            return_value=interfaces_by_mac,
        ):
            assert expected_return == ds.get_physical_nics_by_mac(distro)


def populate_context_dir(path, variables):
    data = "# Context variables generated by OpenNebula\n"
    for k, v in variables.items():
        data += "%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))
    populate_dir(path, {"context.sh": data})


# vi: ts=4 expandtab