summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/integration_tests/modules/test_hotplug.py94
-rw-r--r--tests/unittests/cmd/devel/test_hotplug_hook.py218
-rw-r--r--tests/unittests/test_net_activators.py135
3 files changed, 407 insertions, 40 deletions
diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py
new file mode 100644
index 00000000..b683566f
--- /dev/null
+++ b/tests/integration_tests/modules/test_hotplug.py
@@ -0,0 +1,94 @@
+import pytest
+import time
+import yaml
+from collections import namedtuple
+
+from tests.integration_tests.instances import IntegrationInstance
+
+USER_DATA = """\
+#cloud-config
+updates:
+ network:
+ when: ['hotplug']
+"""
+
+ip_addr = namedtuple('ip_addr', 'interface state ip4 ip6')
+
+
+def _wait_till_hotplug_complete(client, expected_runs=1):
+ for _ in range(60):
+ log = client.read_from_file('/var/log/cloud-init.log')
+ if log.count('Exiting hotplug handler') == expected_runs:
+ return log
+ time.sleep(1)
+ raise Exception('Waiting for hotplug handler failed')
+
+
+def _get_ip_addr(client):
+ ips = []
+ lines = client.execute('ip --brief addr').split('\n')
+ for line in lines:
+ attributes = line.split()
+ interface, state = attributes[0], attributes[1]
+ ip4_cidr = attributes[2] if len(attributes) > 2 else None
+ ip6_cidr = attributes[3] if len(attributes) > 3 else None
+ ip4 = ip4_cidr.split('/')[0] if ip4_cidr else None
+ ip6 = ip6_cidr.split('/')[0] if ip6_cidr else None
+ ip = ip_addr(interface, state, ip4, ip6)
+ ips.append(ip)
+ return ips
+
+
+@pytest.mark.openstack
+@pytest.mark.user_data(USER_DATA)
+def test_hotplug_add_remove(client: IntegrationInstance):
+ ips_before = _get_ip_addr(client)
+ log = client.read_from_file('/var/log/cloud-init.log')
+ assert 'Exiting hotplug handler' not in log
+
+ # Add new NIC
+ added_ip = client.instance.add_network_interface()
+ _wait_till_hotplug_complete(client)
+ ips_after_add = _get_ip_addr(client)
+ new_addition = [ip for ip in ips_after_add if ip.ip4 == added_ip][0]
+
+ assert len(ips_after_add) == len(ips_before) + 1
+ assert added_ip not in [ip.ip4 for ip in ips_before]
+ assert added_ip in [ip.ip4 for ip in ips_after_add]
+ assert new_addition.state == 'UP'
+
+ netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ config = yaml.safe_load(netplan_cfg)
+ assert new_addition.interface in config['network']['ethernets']
+
+ # Remove new NIC
+ client.instance.remove_network_interface(added_ip)
+ _wait_till_hotplug_complete(client, expected_runs=2)
+ ips_after_remove = _get_ip_addr(client)
+ assert len(ips_after_remove) == len(ips_before)
+ assert added_ip not in [ip.ip4 for ip in ips_after_remove]
+
+ netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ config = yaml.safe_load(netplan_cfg)
+ assert new_addition.interface not in config['network']['ethernets']
+
+
+@pytest.mark.openstack
+def test_no_hotplug_in_userdata(client: IntegrationInstance):
+ ips_before = _get_ip_addr(client)
+ log = client.read_from_file('/var/log/cloud-init.log')
+ assert 'Exiting hotplug handler' not in log
+
+ # Add new NIC
+ client.instance.add_network_interface()
+ _wait_till_hotplug_complete(client)
+ log = client.read_from_file('/var/log/cloud-init.log')
+ assert 'hotplug not enabled for event of type network' in log
+
+ ips_after_add = _get_ip_addr(client)
+ if len(ips_after_add) == len(ips_before) + 1:
+ # We can see the device, but it should not have been brought up
+ new_ip = [ip for ip in ips_after_add if ip not in ips_before][0]
+ assert new_ip.state == 'DOWN'
+ else:
+ assert len(ips_after_add) == len(ips_before)
diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py
new file mode 100644
index 00000000..63d2490e
--- /dev/null
+++ b/tests/unittests/cmd/devel/test_hotplug_hook.py
@@ -0,0 +1,218 @@
+import pytest
+from collections import namedtuple
+from unittest import mock
+from unittest.mock import call
+
+from cloudinit.cmd.devel.hotplug_hook import handle_hotplug
+from cloudinit.distros import Distro
+from cloudinit.event import EventType
+from cloudinit.net.activators import NetworkActivator
+from cloudinit.net.network_state import NetworkState
+from cloudinit.sources import DataSource
+from cloudinit.stages import Init
+
+
+hotplug_args = namedtuple('hotplug_args', 'udevaction, subsystem, devpath')
+FAKE_MAC = '11:22:33:44:55:66'
+
+
+@pytest.yield_fixture
+def mocks():
+ m_init = mock.MagicMock(spec=Init)
+ m_distro = mock.MagicMock(spec=Distro)
+ m_datasource = mock.MagicMock(spec=DataSource)
+ m_datasource.distro = m_distro
+ m_init.datasource = m_datasource
+ m_init.fetch.return_value = m_datasource
+
+ read_sys_net = mock.patch(
+ 'cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe',
+ return_value=FAKE_MAC
+ )
+
+ m_network_state = mock.MagicMock(spec=NetworkState)
+ parse_net = mock.patch(
+ 'cloudinit.cmd.devel.hotplug_hook.parse_net_config_data',
+ return_value=m_network_state
+ )
+
+ m_activator = mock.MagicMock(spec=NetworkActivator)
+ select_activator = mock.patch(
+ 'cloudinit.cmd.devel.hotplug_hook.activators.select_activator',
+ return_value=m_activator
+ )
+
+ sleep = mock.patch('time.sleep')
+
+ read_sys_net.start()
+ parse_net.start()
+ select_activator.start()
+ m_sleep = sleep.start()
+
+ yield namedtuple('mocks', 'm_init m_network_state m_activator m_sleep')(
+ m_init=m_init,
+ m_network_state=m_network_state,
+ m_activator=m_activator,
+ m_sleep=m_sleep,
+ )
+
+ read_sys_net.stop()
+ parse_net.stop()
+ select_activator.stop()
+ sleep.stop()
+
+
+class TestUnsupportedActions:
+ def test_unsupported_subsystem(self, mocks):
+ with pytest.raises(
+ Exception,
+ match='cannot handle events for subsystem: not_real'
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ subsystem='not_real',
+ udevaction='add'
+ )
+
+ def test_unsupported_udevaction(self, mocks):
+ with pytest.raises(ValueError, match='Unknown action: not_real'):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='not_real',
+ subsystem='net'
+ )
+
+
+class TestHotplug:
+ def test_succcessful_add(self, mocks):
+ init = mocks.m_init
+ mocks.m_network_state.iter_interfaces.return_value = [{
+ 'mac_address': FAKE_MAC,
+ }]
+ handle_hotplug(
+ hotplug_init=init,
+ devpath='/dev/fake',
+ udevaction='add',
+ subsystem='net'
+ )
+ init.datasource.update_metadata_if_supported.assert_called_once_with([
+ EventType.HOTPLUG
+ ])
+ mocks.m_activator.bring_up_interface.assert_called_once_with('fake')
+ mocks.m_activator.bring_down_interface.assert_not_called()
+ init._write_to_cache.assert_called_once_with()
+
+ def test_successful_remove(self, mocks):
+ init = mocks.m_init
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ handle_hotplug(
+ hotplug_init=init,
+ devpath='/dev/fake',
+ udevaction='remove',
+ subsystem='net'
+ )
+ init.datasource.update_metadata_if_supported.assert_called_once_with([
+ EventType.HOTPLUG
+ ])
+ mocks.m_activator.bring_down_interface.assert_called_once_with('fake')
+ mocks.m_activator.bring_up_interface.assert_not_called()
+ init._write_to_cache.assert_called_once_with()
+
+ def test_update_event_disabled(self, mocks, caplog):
+ init = mocks.m_init
+ init.update_event_enabled.return_value = False
+ handle_hotplug(
+ hotplug_init=init,
+ devpath='/dev/fake',
+ udevaction='remove',
+ subsystem='net'
+ )
+ assert 'hotplug not enabled for event of type' in caplog.text
+ init.datasource.update_metadata_if_supported.assert_not_called()
+ mocks.m_activator.bring_up_interface.assert_not_called()
+ mocks.m_activator.bring_down_interface.assert_not_called()
+ init._write_to_cache.assert_not_called()
+
+ def test_update_metadata_failed(self, mocks):
+ mocks.m_init.datasource.update_metadata_if_supported.return_value = \
+ False
+ with pytest.raises(
+ RuntimeError, match='Datasource .* not updated for event hotplug'
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='remove',
+ subsystem='net'
+ )
+
+ def test_detect_hotplugged_device_not_detected_on_add(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ with pytest.raises(
+ RuntimeError,
+ match='Failed to detect {} in updated metadata'.format(FAKE_MAC)
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='add',
+ subsystem='net'
+ )
+
+ def test_detect_hotplugged_device_detected_on_remove(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{
+ 'mac_address': FAKE_MAC,
+ }]
+ with pytest.raises(
+ RuntimeError,
+ match='Failed to detect .* in updated metadata'
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='remove',
+ subsystem='net'
+ )
+
+ def test_apply_failed_on_add(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{
+ 'mac_address': FAKE_MAC,
+ }]
+ mocks.m_activator.bring_up_interface.return_value = False
+ with pytest.raises(
+ RuntimeError, match='Failed to bring up device: /dev/fake'
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='add',
+ subsystem='net'
+ )
+
+ def test_apply_failed_on_remove(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ mocks.m_activator.bring_down_interface.return_value = False
+ with pytest.raises(
+ RuntimeError, match='Failed to bring down device: /dev/fake'
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='remove',
+ subsystem='net'
+ )
+
+ def test_retry(self, mocks):
+ with pytest.raises(RuntimeError):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath='/dev/fake',
+ udevaction='add',
+ subsystem='net'
+ )
+ assert mocks.m_sleep.call_count == 5
+ assert mocks.m_sleep.call_args_list == [
+ call(1), call(3), call(5), call(10), call(30)
+ ]
diff --git a/tests/unittests/test_net_activators.py b/tests/unittests/test_net_activators.py
index f11486ff..db825c35 100644
--- a/tests/unittests/test_net_activators.py
+++ b/tests/unittests/test_net_activators.py
@@ -35,32 +35,8 @@ ethernets:
dhcp4: true
"""
-IF_UP_DOWN_AVAILABLE_CALLS = [
- (('ifquery',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
- (('ifup',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
- (('ifdown',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
-]
-
-IF_UP_DOWN_CALL_LIST = [
- ((['ifup', 'eth0'], ), {}),
- ((['ifup', 'eth1'], ), {}),
-]
-
-NETPLAN_AVAILABLE_CALLS = [
- (('netplan',), {'search': ['/usr/sbin', '/sbin'], 'target': None}),
-]
-
NETPLAN_CALL_LIST = [
- ((['netplan', 'apply'], ), {'capture': True}),
-]
-
-NETWORK_MANAGER_AVAILABLE_CALLS = [
- (('nmcli',), {'target': None}),
-]
-
-NETWORK_MANAGER_CALL_LIST = [
- ((['nmcli', 'connection', 'up', 'eth0'], ), {}),
- ((['nmcli', 'connection', 'up', 'eth1'], ), {}),
+ ((['netplan', 'apply'], ), {}),
]
@@ -126,23 +102,54 @@ class TestSearchAndSelect:
select_activator()
-@pytest.mark.parametrize('activator, available_calls, expected_call_list', [
- (IfUpDownActivator, IF_UP_DOWN_AVAILABLE_CALLS, IF_UP_DOWN_CALL_LIST),
- (NetplanActivator, NETPLAN_AVAILABLE_CALLS, NETPLAN_CALL_LIST),
- (NetworkManagerActivator, NETWORK_MANAGER_AVAILABLE_CALLS,
- NETWORK_MANAGER_CALL_LIST),
+IF_UP_DOWN_AVAILABLE_CALLS = [
+ (('ifquery',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
+ (('ifup',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
+ (('ifdown',), {'search': ['/sbin', '/usr/sbin'], 'target': None}),
+]
+
+NETPLAN_AVAILABLE_CALLS = [
+ (('netplan',), {'search': ['/usr/sbin', '/sbin'], 'target': None}),
+]
+
+NETWORK_MANAGER_AVAILABLE_CALLS = [
+ (('nmcli',), {'target': None}),
+]
+
+
+@pytest.mark.parametrize('activator, available_calls', [
+ (IfUpDownActivator, IF_UP_DOWN_AVAILABLE_CALLS),
+ (NetplanActivator, NETPLAN_AVAILABLE_CALLS),
+ (NetworkManagerActivator, NETWORK_MANAGER_AVAILABLE_CALLS),
])
-class TestIfUpDownActivator:
+class TestActivatorsAvailable:
def test_available(
- self, activator, available_calls, expected_call_list, available_mocks
+ self, activator, available_calls, available_mocks
):
activator.available()
assert available_mocks.m_which.call_args_list == available_calls
+
+IF_UP_DOWN_BRING_UP_CALL_LIST = [
+ ((['ifup', 'eth0'], ), {}),
+ ((['ifup', 'eth1'], ), {}),
+]
+
+NETWORK_MANAGER_BRING_UP_CALL_LIST = [
+ ((['nmcli', 'connection', 'up', 'ifname', 'eth0'], ), {}),
+ ((['nmcli', 'connection', 'up', 'ifname', 'eth1'], ), {}),
+]
+
+
+@pytest.mark.parametrize('activator, expected_call_list', [
+ (IfUpDownActivator, IF_UP_DOWN_BRING_UP_CALL_LIST),
+ (NetplanActivator, NETPLAN_CALL_LIST),
+ (NetworkManagerActivator, NETWORK_MANAGER_BRING_UP_CALL_LIST),
+])
+class TestActivatorsBringUp:
@patch('cloudinit.subp.subp', return_value=('', ''))
def test_bring_up_interface(
- self, m_subp, activator, available_calls, expected_call_list,
- available_mocks
+ self, m_subp, activator, expected_call_list, available_mocks
):
activator.bring_up_interface('eth0')
assert len(m_subp.call_args_list) == 1
@@ -150,16 +157,14 @@ class TestIfUpDownActivator:
@patch('cloudinit.subp.subp', return_value=('', ''))
def test_bring_up_interfaces(
- self, m_subp, activator, available_calls, expected_call_list,
- available_mocks
+ self, m_subp, activator, expected_call_list, available_mocks
):
activator.bring_up_interfaces(['eth0', 'eth1'])
assert expected_call_list == m_subp.call_args_list
@patch('cloudinit.subp.subp', return_value=('', ''))
def test_bring_up_all_interfaces_v1(
- self, m_subp, activator, available_calls, expected_call_list,
- available_mocks
+ self, m_subp, activator, expected_call_list, available_mocks
):
network_state = parse_net_config_data(load(V1_CONFIG))
activator.bring_up_all_interfaces(network_state)
@@ -168,10 +173,60 @@ class TestIfUpDownActivator:
@patch('cloudinit.subp.subp', return_value=('', ''))
def test_bring_up_all_interfaces_v2(
- self, m_subp, activator, available_calls, expected_call_list,
- available_mocks
+ self, m_subp, activator, expected_call_list, available_mocks
):
network_state = parse_net_config_data(load(V2_CONFIG))
activator.bring_up_all_interfaces(network_state)
for call in m_subp.call_args_list:
assert call in expected_call_list
+
+
+IF_UP_DOWN_BRING_DOWN_CALL_LIST = [
+ ((['ifdown', 'eth0'], ), {}),
+ ((['ifdown', 'eth1'], ), {}),
+]
+
+NETWORK_MANAGER_BRING_DOWN_CALL_LIST = [
+ ((['nmcli', 'connection', 'down', 'eth0'], ), {}),
+ ((['nmcli', 'connection', 'down', 'eth1'], ), {}),
+]
+
+
+@pytest.mark.parametrize('activator, expected_call_list', [
+ (IfUpDownActivator, IF_UP_DOWN_BRING_DOWN_CALL_LIST),
+ (NetplanActivator, NETPLAN_CALL_LIST),
+ (NetworkManagerActivator, NETWORK_MANAGER_BRING_DOWN_CALL_LIST),
+])
+class TestActivatorsBringDown:
+ @patch('cloudinit.subp.subp', return_value=('', ''))
+ def test_bring_down_interface(
+ self, m_subp, activator, expected_call_list, available_mocks
+ ):
+ activator.bring_down_interface('eth0')
+ assert len(m_subp.call_args_list) == 1
+ assert m_subp.call_args_list[0] == expected_call_list[0]
+
+ @patch('cloudinit.subp.subp', return_value=('', ''))
+ def test_bring_down_interfaces(
+ self, m_subp, activator, expected_call_list, available_mocks
+ ):
+ activator.bring_down_interfaces(['eth0', 'eth1'])
+ assert expected_call_list == m_subp.call_args_list
+
+ @patch('cloudinit.subp.subp', return_value=('', ''))
+ def test_bring_down_all_interfaces_v1(
+ self, m_subp, activator, expected_call_list, available_mocks
+ ):
+ network_state = parse_net_config_data(load(V1_CONFIG))
+ activator.bring_down_all_interfaces(network_state)
+ for call in m_subp.call_args_list:
+ assert call in expected_call_list
+
+ @patch('cloudinit.subp.subp', return_value=('', ''))
+ def test_bring_down_all_interfaces_v2(
+ self, m_subp, activator, expected_call_list, available_mocks
+ ):
+ network_state = parse_net_config_data(load(V2_CONFIG))
+ activator.bring_down_all_interfaces(network_state)
+ for call in m_subp.call_args_list:
+ assert call in expected_call_list