summaryrefslogtreecommitdiff
path: root/python/vyos/ethtool.py
blob: 5e241fc08a9e81f13040f779cfe1e0937f2794c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright 2021-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.  If not, see <http://www.gnu.org/licenses/>.

import os
import re

from json import loads
from vyos.utils.process import popen

# These drivers do not support using ethtool to change the speed, duplex, or
# flow control settings
_drivers_without_speed_duplex_flow = ['vmxnet3', 'virtio_net', 'xen_netfront',
                                      'iavf', 'ice', 'i40e', 'hv_netvsc', 'veth', 'ixgbevf',
                                      'tun']

class Ethtool:
    """
    Class is used to retrive and cache information about an ethernet adapter
    """
    # dictionary containing driver featurs, it will be populated on demand and
    # the content will look like:
    # [{'esp-hw-offload': {'active': False, 'fixed': True, 'requested': False},
    #   'esp-tx-csum-hw-offload': {'active': False,
    #                              'fixed': True,
    #                              'requested': False},
    #   'fcoe-mtu': {'active': False, 'fixed': True, 'requested': False},
    #   'generic-receive-offload': {'active': True,
    #                               'fixed': False,
    #                               'requested': True},
    #   'generic-segmentation-offload': {'active': True,
    #                                    'fixed': False,
    #                                    'requested': True},
    #   'highdma': {'active': True, 'fixed': False, 'requested': True},
    #   'ifname': 'eth0',
    #   'l2-fwd-offload': {'active': False, 'fixed': True, 'requested': False},
    #   'large-receive-offload': {'active': False,
    #                             'fixed': False,
    #                             'requested': False},
    # ...
    _features = { }
    # dictionary containing available interface speed and duplex settings
    # {
    #   '10'  : {'full': '', 'half': ''},
    #   '100' : {'full': '', 'half': ''},
    #   '1000': {'full': ''}
    #  }
    _speed_duplex = {'auto': {'auto': ''}}
    _ring_buffer = None
    _driver_name = None
    _auto_negotiation = False
    _auto_negotiation_supported = None
    _flow_control = None

    def __init__(self, ifname):
        # Get driver used for interface
        out, _ = popen(f'ethtool --driver {ifname}')
        driver = re.search(r'driver:\s(\w+)', out)
        if driver:
            self._driver_name = driver.group(1)

        # Build a dictinary of supported link-speed and dupley settings.
        out, _ = popen(f'ethtool {ifname}')
        reading = False
        pattern = re.compile(r'\d+base.*')
        for line in out.splitlines()[1:]:
            line = line.lstrip()
            if 'Supported link modes:' in line:
                reading = True
            if 'Supported pause frame use:' in line:
                reading = False
            if reading:
                for block in line.split():
                    if pattern.search(block):
                        speed = block.split('base')[0]
                        duplex = block.split('/')[-1].lower()
                        if speed not in self._speed_duplex:
                            self._speed_duplex.update({ speed : {}})
                        if duplex not in self._speed_duplex[speed]:
                            self._speed_duplex[speed].update({ duplex : ''})
            if 'Supports auto-negotiation:' in line:
                # Split the following string: Auto-negotiation: off
                # we are only interested in off or on
                tmp = line.split()[-1]
                self._auto_negotiation_supported = bool(tmp == 'Yes')
            # Only read in if Auto-negotiation is supported
            if self._auto_negotiation_supported and 'Auto-negotiation:' in line:
                # Split the following string: Auto-negotiation: off
                # we are only interested in off or on
                tmp = line.split()[-1]
                self._auto_negotiation = bool(tmp == 'on')

        # Now populate driver features
        out, _ = popen(f'ethtool --json --show-features {ifname}')
        self._features = loads(out)

        # Get information about NIC ring buffers
        out, _ = popen(f'ethtool --json --show-ring {ifname}')
        self._ring_buffer = loads(out)

        # Get current flow control settings, but this is not supported by
        # all NICs (e.g. vmxnet3 does not support is)
        out, err = popen(f'ethtool --json --show-pause {ifname}')
        if not bool(err):
            self._flow_control = loads(out)

    def check_auto_negotiation_supported(self):
        """ Check if the NIC supports changing auto-negotiation """
        return self._auto_negotiation_supported

    def get_auto_negotiation(self):
        return self._auto_negotiation_supported and self._auto_negotiation

    def get_driver_name(self):
        return self._driver_name

    def _get_generic(self, feature):
        """
        Generic method to read self._features and return a tuple for feature
        enabled and feature is fixed.

        In case of a missing key, return "fixed = True and enabled = False"
        """
        active = False
        fixed = True
        if feature in self._features[0]:
            active = bool(self._features[0][feature]['active'])
            fixed = bool(self._features[0][feature]['fixed'])
        return active, fixed

    def get_generic_receive_offload(self):
        return self._get_generic('generic-receive-offload')

    def get_generic_segmentation_offload(self):
        return self._get_generic('generic-segmentation-offload')

    def get_hw_tc_offload(self):
        return self._get_generic('hw-tc-offload')

    def get_large_receive_offload(self):
        return self._get_generic('large-receive-offload')

    def get_scatter_gather(self):
        return self._get_generic('scatter-gather')

    def get_tcp_segmentation_offload(self):
        return self._get_generic('tcp-segmentation-offload')

    def get_ring_buffer_max(self, rx_tx):
        # Configuration of RX/TX ring-buffers is not supported on every device,
        # thus when it's impossible return None
        if rx_tx not in ['rx', 'tx']:
            ValueError('Ring-buffer type must be either "rx" or "tx"')
        return str(self._ring_buffer[0].get(f'{rx_tx}-max', None))

    def get_ring_buffer(self, rx_tx):
        # Configuration of RX/TX ring-buffers is not supported on every device,
        # thus when it's impossible return None
        if rx_tx not in ['rx', 'tx']:
            ValueError('Ring-buffer type must be either "rx" or "tx"')
        return str(self._ring_buffer[0].get(rx_tx, None))

    def check_speed_duplex(self, speed, duplex):
        """ Check if the passed speed and duplex combination is supported by
        the underlaying network adapter. """
        if isinstance(speed, int):
            speed = str(speed)
        if speed != 'auto' and not speed.isdigit():
            raise ValueError(f'Value "{speed}" for speed is invalid!')
        if duplex not in ['auto', 'full', 'half']:
            raise ValueError(f'Value "{duplex}" for duplex is invalid!')

        if self.get_driver_name() in _drivers_without_speed_duplex_flow:
            return False

        if speed in self._speed_duplex:
            if duplex in self._speed_duplex[speed]:
                return True
        return False

    def check_flow_control(self):
        """ Check if the NIC supports flow-control """
        return bool(self._flow_control)

    def get_flow_control(self):
        if self._flow_control == None:
            raise ValueError('Interface does not support changing '\
                             'flow-control settings!')

        return 'on' if bool(self._flow_control[0]['autonegotiate']) else 'off'