summaryrefslogtreecommitdiff
path: root/cloudinit/sources/helpers/netlink.py
blob: c2ad587b05ba7c76b7548abfc8a122cf714cbe69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Author: Tamilmani Manoharan <tamanoha@microsoft.com>
#
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit import log as logging
from cloudinit import util
from collections import namedtuple

import os
import select
import socket
import struct

LOG = logging.getLogger(__name__)

# http://man7.org/linux/man-pages/man7/netlink.7.html
RTMGRP_LINK = 1
NLMSG_NOOP = 1
NLMSG_ERROR = 2
NLMSG_DONE = 3
RTM_NEWLINK = 16
RTM_DELLINK = 17
RTM_GETLINK = 18
RTM_SETLINK = 19
MAX_SIZE = 65535
RTA_DATA_OFFSET = 32
MSG_TYPE_OFFSET = 16
SELECT_TIMEOUT = 60

NLMSGHDR_FMT = "IHHII"
IFINFOMSG_FMT = "BHiII"
NLMSGHDR_SIZE = struct.calcsize(NLMSGHDR_FMT)
IFINFOMSG_SIZE = struct.calcsize(IFINFOMSG_FMT)
RTATTR_START_OFFSET = NLMSGHDR_SIZE + IFINFOMSG_SIZE
RTA_DATA_START_OFFSET = 4
PAD_ALIGNMENT = 4

IFLA_IFNAME = 3
IFLA_OPERSTATE = 16

# https://www.kernel.org/doc/Documentation/networking/operstates.txt
OPER_UNKNOWN = 0
OPER_NOTPRESENT = 1
OPER_DOWN = 2
OPER_LOWERLAYERDOWN = 3
OPER_TESTING = 4
OPER_DORMANT = 5
OPER_UP = 6

RTAAttr = namedtuple('RTAAttr', ['length', 'rta_type', 'data'])
InterfaceOperstate = namedtuple('InterfaceOperstate', ['ifname', 'operstate'])
NetlinkHeader = namedtuple('NetlinkHeader', ['length', 'type', 'flags', 'seq',
                                             'pid'])


class NetlinkCreateSocketError(RuntimeError):
    '''Raised if netlink socket fails during create or bind.'''


def create_bound_netlink_socket():
    '''Creates netlink socket and bind on netlink group to catch interface
    down/up events. The socket will bound only on RTMGRP_LINK (which only
    includes RTM_NEWLINK/RTM_DELLINK/RTM_GETLINK events). The socket is set to
    non-blocking mode since we're only receiving messages.

    :returns: netlink socket in non-blocking mode
    :raises: NetlinkCreateSocketError
    '''
    try:
        netlink_socket = socket.socket(socket.AF_NETLINK,
                                       socket.SOCK_RAW,
                                       socket.NETLINK_ROUTE)
        netlink_socket.bind((os.getpid(), RTMGRP_LINK))
        netlink_socket.setblocking(0)
    except socket.error as e:
        msg = "Exception during netlink socket create: %s" % e
        raise NetlinkCreateSocketError(msg) from e
    LOG.debug("Created netlink socket")
    return netlink_socket


def get_netlink_msg_header(data):
    '''Gets netlink message type and length

    :param: data read from netlink socket
    :returns: netlink message type
    :raises: AssertionError if data is None or data is not >= NLMSGHDR_SIZE
    struct nlmsghdr {
               __u32 nlmsg_len;    /* Length of message including header */
               __u16 nlmsg_type;   /* Type of message content */
               __u16 nlmsg_flags;  /* Additional flags */
               __u32 nlmsg_seq;    /* Sequence number */
               __u32 nlmsg_pid;    /* Sender port ID */
    };
    '''
    assert (data is not None), ("data is none")
    assert (len(data) >= NLMSGHDR_SIZE), (
        "data is smaller than netlink message header")
    msg_len, msg_type, flags, seq, pid = struct.unpack(NLMSGHDR_FMT,
                                                       data[:MSG_TYPE_OFFSET])
    LOG.debug("Got netlink msg of type %d", msg_type)
    return NetlinkHeader(msg_len, msg_type, flags, seq, pid)


def read_netlink_socket(netlink_socket, timeout=None):
    '''Select and read from the netlink socket if ready.

    :param: netlink_socket: specify which socket object to read from
    :param: timeout: specify a timeout value (integer) to wait while reading,
            if none, it will block indefinitely until socket ready for read
    :returns: string of data read (max length = <MAX_SIZE>) from socket,
              if no data read, returns None
    :raises: AssertionError if netlink_socket is None
    '''
    assert (netlink_socket is not None), ("netlink socket is none")
    read_set, _, _ = select.select([netlink_socket], [], [], timeout)
    # Incase of timeout,read_set doesn't contain netlink socket.
    # just return from this function
    if netlink_socket not in read_set:
        return None
    LOG.debug("netlink socket ready for read")
    data = netlink_socket.recv(MAX_SIZE)
    if data is None:
        LOG.error("Reading from Netlink socket returned no data")
    return data


def unpack_rta_attr(data, offset):
    '''Unpack a single rta attribute.

    :param: data: string of data read from netlink socket
    :param: offset: starting offset of RTA Attribute
    :return: RTAAttr object with length, type and data. On error, return None.
    :raises: AssertionError if data is None or offset is not integer.
    '''
    assert (data is not None), ("data is none")
    assert (type(offset) == int), ("offset is not integer")
    assert (offset >= RTATTR_START_OFFSET), (
        "rta offset is less than expected length")
    length = rta_type = 0
    attr_data = None
    try:
        length = struct.unpack_from("H", data, offset=offset)[0]
        rta_type = struct.unpack_from("H", data, offset=offset+2)[0]
    except struct.error:
        return None  # Should mean our offset is >= remaining data

    # Unpack just the attribute's data. Offset by 4 to skip length/type header
    attr_data = data[offset+RTA_DATA_START_OFFSET:offset+length]
    return RTAAttr(length, rta_type, attr_data)


def read_rta_oper_state(data):
    '''Reads Interface name and operational state from RTA Data.

    :param: data: string of data read from netlink socket
    :returns: InterfaceOperstate object containing if_name and oper_state.
              None if data does not contain valid IFLA_OPERSTATE and
              IFLA_IFNAME messages.
    :raises: AssertionError if data is None or length of data is
             smaller than RTATTR_START_OFFSET.
    '''
    assert (data is not None), ("data is none")
    assert (len(data) > RTATTR_START_OFFSET), (
        "length of data is smaller than RTATTR_START_OFFSET")
    ifname = operstate = None
    offset = RTATTR_START_OFFSET
    while offset <= len(data):
        attr = unpack_rta_attr(data, offset)
        if not attr or attr.length == 0:
            break
        # Each attribute is 4-byte aligned. Determine pad length.
        padlen = (PAD_ALIGNMENT -
                  (attr.length % PAD_ALIGNMENT)) % PAD_ALIGNMENT
        offset += attr.length + padlen

        if attr.rta_type == IFLA_OPERSTATE:
            operstate = ord(attr.data)
        elif attr.rta_type == IFLA_IFNAME:
            interface_name = util.decode_binary(attr.data, 'utf-8')
            ifname = interface_name.strip('\0')
    if not ifname or operstate is None:
        return None
    LOG.debug("rta attrs: ifname %s operstate %d", ifname, operstate)
    return InterfaceOperstate(ifname, operstate)


def wait_for_media_disconnect_connect(netlink_socket, ifname):
    '''Block until media disconnect and connect has happened on an interface.
    Listens on netlink socket to receive netlink events and when the carrier
    changes from 0 to 1, it considers event has happened and
    return from this function

    :param: netlink_socket: netlink_socket to receive events
    :param: ifname: Interface name to lookout for netlink events
    :raises: AssertionError if netlink_socket is None or ifname is None.
    '''
    assert (netlink_socket is not None), ("netlink socket is none")
    assert (ifname is not None), ("interface name is none")
    assert (len(ifname) > 0), ("interface name cannot be empty")
    carrier = OPER_UP
    prevCarrier = OPER_UP
    data = bytes()
    LOG.debug("Wait for media disconnect and reconnect to happen")
    while True:
        recv_data = read_netlink_socket(netlink_socket, SELECT_TIMEOUT)
        if recv_data is None:
            continue
        LOG.debug('read %d bytes from socket', len(recv_data))
        data += recv_data
        LOG.debug('Length of data after concat %d', len(data))
        offset = 0
        datalen = len(data)
        while offset < datalen:
            nl_msg = data[offset:]
            if len(nl_msg) < NLMSGHDR_SIZE:
                LOG.debug("Data is smaller than netlink header")
                break
            nlheader = get_netlink_msg_header(nl_msg)
            if len(nl_msg) < nlheader.length:
                LOG.debug("Partial data. Smaller than netlink message")
                break
            padlen = (nlheader.length+PAD_ALIGNMENT-1) & ~(PAD_ALIGNMENT-1)
            offset = offset + padlen
            LOG.debug('offset to next netlink message: %d', offset)
            # Ignore any messages not new link or del link
            if nlheader.type not in [RTM_NEWLINK, RTM_DELLINK]:
                continue
            interface_state = read_rta_oper_state(nl_msg)
            if interface_state is None:
                LOG.debug('Failed to read rta attributes: %s', interface_state)
                continue
            if interface_state.ifname != ifname:
                LOG.debug(
                    "Ignored netlink event on interface %s. Waiting for %s.",
                    interface_state.ifname, ifname)
                continue
            if interface_state.operstate not in [OPER_UP, OPER_DOWN]:
                continue
            prevCarrier = carrier
            carrier = interface_state.operstate
            # check for carrier down, up sequence
            isVnetSwitch = (prevCarrier == OPER_DOWN) and (carrier == OPER_UP)
            if isVnetSwitch:
                LOG.debug("Media switch happened on %s.", ifname)
                return
        data = data[offset:]

# vi: ts=4 expandtab