diff options
Diffstat (limited to 'cloudinit/sources/helpers')
| -rw-r--r-- | cloudinit/sources/helpers/netlink.py | 102 | ||||
| -rw-r--r-- | cloudinit/sources/helpers/tests/test_netlink.py | 74 | 
2 files changed, 163 insertions, 13 deletions
| diff --git a/cloudinit/sources/helpers/netlink.py b/cloudinit/sources/helpers/netlink.py index c2ad587b..e13d6834 100644 --- a/cloudinit/sources/helpers/netlink.py +++ b/cloudinit/sources/helpers/netlink.py @@ -185,6 +185,54 @@ def read_rta_oper_state(data):      return InterfaceOperstate(ifname, operstate) +def wait_for_nic_attach_event(netlink_socket, existing_nics): +    '''Block until a single nic is attached. + +    :param: netlink_socket: netlink_socket to receive events +    :param: existing_nics: List of existing nics so that we can skip them. +    :raises: AssertionError if netlink_socket is none. +    ''' +    LOG.debug("Preparing to wait for nic attach.") +    ifname = None + +    def should_continue_cb(iname, carrier, prevCarrier): +        if iname in existing_nics: +            return True +        nonlocal ifname +        ifname = iname +        return False + +    # We can return even if the operational state of the new nic is DOWN +    # because we set it to UP before doing dhcp. +    read_netlink_messages(netlink_socket, +                          None, +                          [RTM_NEWLINK], +                          [OPER_UP, OPER_DOWN], +                          should_continue_cb) +    return ifname + + +def wait_for_nic_detach_event(netlink_socket): +    '''Block until a single nic is detached and its operational state is down. + +    :param: netlink_socket: netlink_socket to receive events. +    ''' +    LOG.debug("Preparing to wait for nic detach.") +    ifname = None + +    def should_continue_cb(iname, carrier, prevCarrier): +        nonlocal ifname +        ifname = iname +        return False + +    read_netlink_messages(netlink_socket, +                          None, +                          [RTM_DELLINK], +                          [OPER_DOWN], +                          should_continue_cb) +    return ifname + +  def wait_for_media_disconnect_connect(netlink_socket, ifname):      '''Block until media disconnect and connect has happened on an interface.      Listens on netlink socket to receive netlink events and when the carrier @@ -198,10 +246,42 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):      assert (netlink_socket is not None), ("netlink socket is none")      assert (ifname is not None), ("interface name is none")      assert (len(ifname) > 0), ("interface name cannot be empty") + +    def should_continue_cb(iname, carrier, prevCarrier): +        # check for carrier down, up sequence +        isVnetSwitch = (prevCarrier == OPER_DOWN) and (carrier == OPER_UP) +        if isVnetSwitch: +            LOG.debug("Media switch happened on %s.", ifname) +            return False +        return True + +    LOG.debug("Wait for media disconnect and reconnect to happen") +    read_netlink_messages(netlink_socket, +                          ifname, +                          [RTM_NEWLINK, RTM_DELLINK], +                          [OPER_UP, OPER_DOWN], +                          should_continue_cb) + + +def read_netlink_messages(netlink_socket, +                          ifname_filter, +                          rtm_types, +                          operstates, +                          should_continue_callback): +    ''' Reads from the netlink socket until the condition specified by +    the continuation callback is met. + +    :param: netlink_socket: netlink_socket to receive events. +    :param: ifname_filter: if not None, will only listen for this interface. +    :param: rtm_types: Type of netlink events to listen for. +    :param: operstates: Operational states to listen. +    :param: should_continue_callback: Specifies when to stop listening. +    ''' +    if netlink_socket is None: +        raise RuntimeError("Netlink socket is none") +    data = bytes()      carrier = OPER_UP      prevCarrier = OPER_UP -    data = bytes() -    LOG.debug("Wait for media disconnect and reconnect to happen")      while True:          recv_data = read_netlink_socket(netlink_socket, SELECT_TIMEOUT)          if recv_data is None: @@ -223,26 +303,26 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):              padlen = (nlheader.length+PAD_ALIGNMENT-1) & ~(PAD_ALIGNMENT-1)              offset = offset + padlen              LOG.debug('offset to next netlink message: %d', offset) -            # Ignore any messages not new link or del link -            if nlheader.type not in [RTM_NEWLINK, RTM_DELLINK]: +            # Continue if we are not interested in this message. +            if nlheader.type not in rtm_types:                  continue              interface_state = read_rta_oper_state(nl_msg)              if interface_state is None:                  LOG.debug('Failed to read rta attributes: %s', interface_state)                  continue -            if interface_state.ifname != ifname: +            if (ifname_filter is not None and +                    interface_state.ifname != ifname_filter):                  LOG.debug(                      "Ignored netlink event on interface %s. Waiting for %s.", -                    interface_state.ifname, ifname) +                    interface_state.ifname, ifname_filter)                  continue -            if interface_state.operstate not in [OPER_UP, OPER_DOWN]: +            if interface_state.operstate not in operstates:                  continue              prevCarrier = carrier              carrier = interface_state.operstate -            # check for carrier down, up sequence -            isVnetSwitch = (prevCarrier == OPER_DOWN) and (carrier == OPER_UP) -            if isVnetSwitch: -                LOG.debug("Media switch happened on %s.", ifname) +            if not should_continue_callback(interface_state.ifname, +                                            carrier, +                                            prevCarrier):                  return          data = data[offset:] diff --git a/cloudinit/sources/helpers/tests/test_netlink.py b/cloudinit/sources/helpers/tests/test_netlink.py index 10760bd6..cafe3961 100644 --- a/cloudinit/sources/helpers/tests/test_netlink.py +++ b/cloudinit/sources/helpers/tests/test_netlink.py @@ -9,9 +9,10 @@ import codecs  from cloudinit.sources.helpers.netlink import (      NetlinkCreateSocketError, create_bound_netlink_socket, read_netlink_socket,      read_rta_oper_state, unpack_rta_attr, wait_for_media_disconnect_connect, +    wait_for_nic_attach_event, wait_for_nic_detach_event,      OPER_DOWN, OPER_UP, OPER_DORMANT, OPER_LOWERLAYERDOWN, OPER_NOTPRESENT, -    OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_SETLINK, -    RTM_GETLINK, MAX_SIZE) +    OPER_TESTING, OPER_UNKNOWN, RTATTR_START_OFFSET, RTM_NEWLINK, RTM_DELLINK, +    RTM_SETLINK, RTM_GETLINK, MAX_SIZE)  def int_to_bytes(i): @@ -135,6 +136,75 @@ class TestParseNetlinkMessage(CiTestCase):  @mock.patch('cloudinit.sources.helpers.netlink.socket.socket')  @mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket') +class TestNicAttachDetach(CiTestCase): +    with_logs = True + +    def _media_switch_data(self, ifname, msg_type, operstate): +        '''construct netlink data with specified fields''' +        if ifname and operstate is not None: +            data = bytearray(48) +            bytes = ifname.encode("utf-8") +            struct.pack_into("HH4sHHc", data, RTATTR_START_OFFSET, 8, 3, +                             bytes, 5, 16, int_to_bytes(operstate)) +        elif ifname: +            data = bytearray(40) +            bytes = ifname.encode("utf-8") +            struct.pack_into("HH4s", data, RTATTR_START_OFFSET, 8, 3, bytes) +        elif operstate: +            data = bytearray(40) +            struct.pack_into("HHc", data, RTATTR_START_OFFSET, 5, 16, +                             int_to_bytes(operstate)) +        struct.pack_into("=LHHLL", data, 0, len(data), msg_type, 0, 0, 0) +        return data + +    def test_nic_attached_oper_down(self, m_read_netlink_socket, m_socket): +        '''Test for a new nic attached''' +        ifname = "eth0" +        data_op_down = self._media_switch_data(ifname, RTM_NEWLINK, OPER_DOWN) +        m_read_netlink_socket.side_effect = [data_op_down] +        ifread = wait_for_nic_attach_event(m_socket, []) +        self.assertEqual(m_read_netlink_socket.call_count, 1) +        self.assertEqual(ifname, ifread) + +    def test_nic_attached_oper_up(self, m_read_netlink_socket, m_socket): +        '''Test for a new nic attached''' +        ifname = "eth0" +        data_op_up = self._media_switch_data(ifname, RTM_NEWLINK, OPER_UP) +        m_read_netlink_socket.side_effect = [data_op_up] +        ifread = wait_for_nic_attach_event(m_socket, []) +        self.assertEqual(m_read_netlink_socket.call_count, 1) +        self.assertEqual(ifname, ifread) + +    def test_nic_attach_ignore_existing(self, m_read_netlink_socket, m_socket): +        '''Test that we read only the interfaces we are interested in.''' +        data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN) +        data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN) +        m_read_netlink_socket.side_effect = [data_eth0, data_eth1] +        ifread = wait_for_nic_attach_event(m_socket, ["eth0"]) +        self.assertEqual(m_read_netlink_socket.call_count, 2) +        self.assertEqual("eth1", ifread) + +    def test_nic_attach_read_first(self, m_read_netlink_socket, m_socket): +        '''Test that we read only the interfaces we are interested in.''' +        data_eth0 = self._media_switch_data("eth0", RTM_NEWLINK, OPER_DOWN) +        data_eth1 = self._media_switch_data("eth1", RTM_NEWLINK, OPER_DOWN) +        m_read_netlink_socket.side_effect = [data_eth0, data_eth1] +        ifread = wait_for_nic_attach_event(m_socket, ["eth1"]) +        self.assertEqual(m_read_netlink_socket.call_count, 1) +        self.assertEqual("eth0", ifread) + +    def test_nic_detached(self, m_read_netlink_socket, m_socket): +        '''Test for an existing nic detached''' +        ifname = "eth0" +        data_op_down = self._media_switch_data(ifname, RTM_DELLINK, OPER_DOWN) +        m_read_netlink_socket.side_effect = [data_op_down] +        ifread = wait_for_nic_detach_event(m_socket) +        self.assertEqual(m_read_netlink_socket.call_count, 1) +        self.assertEqual(ifname, ifread) + + +@mock.patch('cloudinit.sources.helpers.netlink.socket.socket') +@mock.patch('cloudinit.sources.helpers.netlink.read_netlink_socket')  class TestWaitForMediaDisconnectConnect(CiTestCase):      with_logs = True | 
