summaryrefslogtreecommitdiff
path: root/cloudinit/sources/helpers
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources/helpers')
-rw-r--r--cloudinit/sources/helpers/netlink.py102
-rw-r--r--cloudinit/sources/helpers/tests/test_netlink.py74
2 files changed, 163 insertions, 13 deletions
diff --git a/cloudinit/sources/helpers/netlink.py b/cloudinit/sources/helpers/netlink.py
index c2ad587b..e13d6834 100644
--- a/cloudinit/sources/helpers/netlink.py
+++ b/cloudinit/sources/helpers/netlink.py
@@ -185,6 +185,54 @@ def read_rta_oper_state(data):
return InterfaceOperstate(ifname, operstate)
+def wait_for_nic_attach_event(netlink_socket, existing_nics):
+ '''Block until a single nic is attached.
+
+ :param: netlink_socket: netlink_socket to receive events
+ :param: existing_nics: List of existing nics so that we can skip them.
+ :raises: AssertionError if netlink_socket is none.
+ '''
+ LOG.debug("Preparing to wait for nic attach.")
+ ifname = None
+
+ def should_continue_cb(iname, carrier, prevCarrier):
+ if iname in existing_nics:
+ return True
+ nonlocal ifname
+ ifname = iname
+ return False
+
+ # We can return even if the operational state of the new nic is DOWN
+ # because we set it to UP before doing dhcp.
+ read_netlink_messages(netlink_socket,
+ None,
+ [RTM_NEWLINK],
+ [OPER_UP, OPER_DOWN],
+ should_continue_cb)
+ return ifname
+
+
+def wait_for_nic_detach_event(netlink_socket):
+ '''Block until a single nic is detached and its operational state is down.
+
+ :param: netlink_socket: netlink_socket to receive events.
+ '''
+ LOG.debug("Preparing to wait for nic detach.")
+ ifname = None
+
+ def should_continue_cb(iname, carrier, prevCarrier):
+ nonlocal ifname
+ ifname = iname
+ return False
+
+ read_netlink_messages(netlink_socket,
+ None,
+ [RTM_DELLINK],
+ [OPER_DOWN],
+ should_continue_cb)
+ return ifname
+
+
def wait_for_media_disconnect_connect(netlink_socket, ifname):
'''Block until media disconnect and connect has happened on an interface.
Listens on netlink socket to receive netlink events and when the carrier
@@ -198,10 +246,42 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):
assert (netlink_socket is not None), ("netlink socket is none")
assert (ifname is not None), ("interface name is none")
assert (len(ifname) > 0), ("interface name cannot be empty")
+
+ def should_continue_cb(iname, carrier, prevCarrier):
+ # check for carrier down, up sequence
+ isVnetSwitch = (prevCarrier == OPER_DOWN) and (carrier == OPER_UP)
+ if isVnetSwitch:
+ LOG.debug("Media switch happened on %s.", ifname)
+ return False
+ return True
+
+ LOG.debug("Wait for media disconnect and reconnect to happen")
+ read_netlink_messages(netlink_socket,
+ ifname,
+ [RTM_NEWLINK, RTM_DELLINK],
+ [OPER_UP, OPER_DOWN],
+ should_continue_cb)
+
+
+def read_netlink_messages(netlink_socket,
+ ifname_filter,
+ rtm_types,
+ operstates,
+ should_continue_callback):
+ ''' Reads from the netlink socket until the condition specified by
+ the continuation callback is met.
+
+ :param: netlink_socket: netlink_socket to receive events.
+ :param: ifname_filter: if not None, will only listen for this interface.
+ :param: rtm_types: Type of netlink events to listen for.
+ :param: operstates: Operational states to listen.
+ :param: should_continue_callback: Specifies when to stop listening.
+ '''
+ if netlink_socket is None:
+ raise RuntimeError("Netlink socket is none")
+ data = bytes()
carrier = OPER_UP
prevCarrier = OPER_UP
- data = bytes()
- LOG.debug("Wait for media disconnect and reconnect to happen")
while True:
recv_data = read_netlink_socket(netlink_socket, SELECT_TIMEOUT)
if recv_data is None:
@@ -223,26 +303,26 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):
padlen = (nlheader.length+PAD_ALIGNMENT-1) & ~(PAD_ALIGNMENT-1)
offset = offset + padlen
LOG.debug('offset to next netlink message: %d', offset)
- # Ignore any messages not new link or del link
- if nlheader.type not in [RTM_NEWLINK, RTM_DELLINK]:
+ # Continue if we are not interested in this message.
+ if nlheader.type not in rtm_types:
continue
interface_state = read_rta_oper_state(nl_msg)
if interface_state is None:
LOG.debug('Failed to read rta attributes: %s', interface_state)
continue
- if interface_state.ifname != ifname:
+ if (ifname_filter is not None and
+ interface_state.ifname != ifname_filter):
LOG.debug(
"Ignored netlink event on interface %s. Waiting for %s.",
- interface_state.ifname, ifname)
+ interface_state.ifname, ifname_filter)
continue
- if interface_state.operstate not in [OPER_UP, OPER_DOWN]:
+ if interface_state.operstate not in operstates:
continue
prevCarrier = carrier
carrier = interface_state.operstate
- # check for carrier down, up sequence
- isVnetSwitch = (prevCarrier == OPER_DOWN) and (carrier == OPER_UP)
- if isVnetSwitch:
- LOG.debug("Media switch happened on %s.", ifname)
+ if not should_continue_callback(interface_state.ifname,
+ carrier,
+ prevCarrier):
return
data = data[offset:]
diff --git a/cloudinit/sources/helpers/tests/test_netlink.py b/cloudinit/sources/helpers/tests/test_netlink.py
index 10760bd6..cafe3961 100644
--- a/cloudinit/sources/helpers/tests/test_netlink.py
+++ b/cloudinit/sources/helpers/tests/test_netlink.py
@@ -9,9 +9,10 @@ import codecs
from cloudinit.sources.helpers.netlink import (
NetlinkCreateSocketError, create_bound_netlink_socket, read_netlink_socket,
read_rta_oper_state, unpack_rta_attr, wait_for_media_disconnect_connect,
+ wait_for_nic_attach_event, wait_for_nic_detach_event,
OPER_DOWN, OPER_UP, OPER_DORMANT, OPER_LOWERLAYERDOWN, OPER_NOTPRESENT,
- OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_SETLINK,
- RTM_GETLINK, MAX_SIZE)
+ OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_DELLINK,
+ RTM_SETLINK, RTM_GETLINK, MAX_SIZE)
def int_to_bytes(i):
@@ -135,6 +136,75 @@ class TestParseNetlinkMessage(CiTestCase):
@mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket')
+class TestNicAttachDetach(CiTestCase):
+ with_logs = True
+
+ def _media_switch_data(self, ifname, msg_type, operstate):
+ '''construct netlink data with specified fields'''
+ if ifname and operstate is not None:
+ data = bytearray(48)
+ bytes = ifname.encode("utf-8")
+ struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3,
+ bytes, 5, 16, int_to_bytes(operstate))
+ elif ifname:
+ data = bytearray(40)
+ bytes = ifname.encode("utf-8")
+ struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes)
+ elif operstate:
+ data = bytearray(40)
+ struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16,
+ int_to_bytes(operstate))
+ struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0)
+ return data
+
+ def test_nic_attached_oper_down(self, m_read_netlink_socket, m_socket):
+ '''Test for a new nic attached'''
+ ifname = "eth0"
+ data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN)
+ m_read_netlink_socket.side_effect = [data_op_down]
+ ifread = wait_for_nic_attach_event(m_socket, [])
+ self.assertEqual(m_read_netlink_socket.call_count, 1)
+ self.assertEqual(ifname, ifread)
+
+ def test_nic_attached_oper_up(self, m_read_netlink_socket, m_socket):
+ '''Test for a new nic attached'''
+ ifname = "eth0"
+ data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP)
+ m_read_netlink_socket.side_effect = [data_op_up]
+ ifread = wait_for_nic_attach_event(m_socket, [])
+ self.assertEqual(m_read_netlink_socket.call_count, 1)
+ self.assertEqual(ifname, ifread)
+
+ def test_nic_attach_ignore_existing(self, m_read_netlink_socket, m_socket):
+ '''Test that we read only the interfaces we are interested in.'''
+ data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN)
+ data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN)
+ m_read_netlink_socket.side_effect = [data_eth0, data_eth1]
+ ifread = wait_for_nic_attach_event(m_socket, ["eth0"])
+ self.assertEqual(m_read_netlink_socket.call_count, 2)
+ self.assertEqual("eth1", ifread)
+
+ def test_nic_attach_read_first(self, m_read_netlink_socket, m_socket):
+ '''Test that we read only the interfaces we are interested in.'''
+ data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN)
+ data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN)
+ m_read_netlink_socket.side_effect = [data_eth0, data_eth1]
+ ifread = wait_for_nic_attach_event(m_socket, ["eth1"])
+ self.assertEqual(m_read_netlink_socket.call_count, 1)
+ self.assertEqual("eth0", ifread)
+
+ def test_nic_detached(self, m_read_netlink_socket, m_socket):
+ '''Test for an existing nic detached'''
+ ifname = "eth0"
+ data_op_down = self._media_switch_data(ifname, RTM_DELLINK, OPER_DOWN)
+ m_read_netlink_socket.side_effect = [data_op_down]
+ ifread = wait_for_nic_detach_event(m_socket)
+ self.assertEqual(m_read_netlink_socket.call_count, 1)
+ self.assertEqual(ifname, ifread)
+
+
+@mock.patch('cloudinit.sources.helpers.netlink.socket.socket')
+@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket')
class TestWaitForMediaDisconnectConnect(CiTestCase):
with_logs = True