diff options
Diffstat (limited to 'python/vyos/interfaceconfig.py')
-rw-r--r-- | python/vyos/interfaceconfig.py | 376 |
1 files changed, 0 insertions, 376 deletions
diff --git a/python/vyos/interfaceconfig.py b/python/vyos/interfaceconfig.py deleted file mode 100644 index b8bfb707e..000000000 --- a/python/vyos/interfaceconfig.py +++ /dev/null @@ -1,376 +0,0 @@ -#!/usr/bin/python3 - -# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. - -import sys -import os -import re -import json -import socket -import subprocess - -dhclient_conf_dir = r'/var/lib/dhcp/dhclient_' - -class Interface: - def __init__(self, ifname=None, type=None): - if not ifname: - raise Exception("interface name required") - if not os.path.exists('/sys/class/net/{0}'.format(ifname)) and not type: - raise Exception("interface {0} not found".format(str(ifname))) - else: - if not os.path.exists('/sys/class/net/{0}'.format(ifname)): - try: - ret = subprocess.check_output(['ip link add dev ' + str(ifname) + ' type ' + type], stderr=subprocess.STDOUT, shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - if "Operation not supported" in str(e.output.decode()): - print(str(e.output.decode())) - sys.exit(0) - - self._ifname = str(ifname) - - - @property - def mtu(self): - return self._mtu - - @mtu.setter - def mtu(self, mtu=None): - if mtu < 68 or mtu > 9000: - raise ValueError("mtu size invalid value") - self._mtu = mtu - try: - ret = subprocess.check_output(['ip link set mtu ' + str(mtu) + ' dev ' + self._ifname], shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - - - @property - def macaddr(self): - return self._macaddr - - @macaddr.setter - def macaddr(self, mac=None): - if not re.search('^[a-f0-9:]{17}$', str(mac)): - raise ValueError("mac address invalid") - self._macaddr = str(mac) - try: - ret = subprocess.check_output(['ip link set address ' + mac + ' ' + self._ifname], shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - - @property - def ifalias(self): - return self._ifalias - - @ifalias.setter - def ifalias(self, ifalias=None): - if not ifalias: - self._ifalias = self._ifname - else: - self._ifalias = str(ifalias) - open('/sys/class/net/{0}/ifalias'.format(self._ifname),'w').write(self._ifalias) - - @property - def linkstate(self): - return self._linkstate - - @linkstate.setter - def linkstate(self, state='up'): - if str(state).lower() == 'up' or str(state).lower() == 'down': - self._linkstate = str(state).lower() - else: - self._linkstate = 'up' - try: - ret = subprocess.check_output(['ip link set dev ' + self._ifname + ' ' + state], shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - - - - def _debug(self, e=None): - """ - export DEBUG=1 to see debug messages - """ - if os.getenv('DEBUG') == '1': - if e: - print ("Exception raised:\ncommand: {0}\nerror code: {1}\nsubprocess output: {2}".format(e.cmd, e.returncode, e.output.decode()) ) - return True - return False - - def get_mtu(self): - try: - ret = subprocess.check_output(['ip -j link list dev ' + self._ifname], shell=True).decode() - a = json.loads(ret)[0] - return a['mtu'] - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def get_macaddr(self): - try: - ret = subprocess.check_output(['ip -j -4 link show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - j = json.loads(ret) - return j[0]['address'] - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def get_alias(self): - return open('/sys/class/net/{0}/ifalias'.format(self._ifname),'r').read() - - def del_alias(self): - open('/sys/class/net/{0}/ifalias'.format(self._ifname),'w').write() - - def get_link_state(self): - """ - returns either up/down or None if it can't find the state - """ - try: - ret = subprocess.check_output(['ip -j link show ' + self._ifname], shell=True).decode() - s = json.loads(ret) - return s[0]['operstate'].lower() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def remove_interface(self): - try: - ret = subprocess.check_output(['ip link del dev ' + self._ifname], shell=True).decode() - return 0 - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def get_ipv4_addr(self): - """ - reads all IPs assigned to an interface and returns it in a list, - or None if no IP address is assigned to the interface - """ - ips = [] - try: - ret = subprocess.check_output(['ip -j -4 addr show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - j = json.loads(ret) - for i in j: - if len(i) != 0: - for addr in i['addr_info']: - ips.append(addr['local']) - return ips - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - - def get_ipv6_addr(self): - """ - reads all IPs assigned to an interface and returns it in a list, - or None if no IP address is assigned to the interface - """ - ips = [] - try: - ret = subprocess.check_output(['ip -j -6 addr show dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - j = json.loads(ret) - for i in j: - if len(i) != 0: - for addr in i['addr_info']: - ips.append(addr['local']) - return ips - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - - def add_ipv4_addr(self, ipaddr=[]): - """ - add addresses on the interface - """ - for ip in ipaddr: - try: - ret = subprocess.check_output(['ip -4 address add ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - return True - - - def del_ipv4_addr(self, ipaddr=[]): - """ - delete addresses on the interface - """ - for ip in ipaddr: - try: - ret = subprocess.check_output(['ip -4 address del ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - return True - - - def add_ipv6_addr(self, ipaddr=[]): - """ - add addresses on the interface - """ - for ip in ipaddr: - try: - ret = subprocess.check_output(['ip -6 address add ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - return True - - - def del_ipv6_addr(self, ipaddr=[]): - """ - delete addresses on the interface - """ - for ip in ipaddr: - try: - ret = subprocess.check_output(['ip -6 address del ' + ip + ' dev ' + self._ifname], stderr=subprocess.STDOUT, shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - return True - - - #### replace dhcpv4/v6 with systemd.networkd? - def set_dhcpv4(self): - conf_file = dhclient_conf_dir + self._ifname + '.conf' - pidfile = dhclient_conf_dir + self._ifname + '.pid' - leasefile = dhclient_conf_dir + self._ifname + '.leases' - - a = [ - '# generated by interface_config.py', - 'option rfc3442-classless-static-routes code 121 = array of unsigned integer 8;', - 'interface \"' + self._ifname + '\" {', - '\tsend host-name \"' + socket.gethostname() +'\";', - '\trequest subnet-mask, broadcast-address, routers, domain-name-servers, rfc3442-classless-static-routes, domain-name, interface-mtu;', - '}' - ] - - cnf = "" - for ln in a: - cnf +=str(ln + "\n") - open(conf_file, 'w').write(cnf) - if os.path.exists(dhclient_conf_dir + self._ifname + '.pid'): - try: - ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - try: - ret = subprocess.check_output(['/sbin/dhclient -4 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self._ifname], shell=True).decode() - return True - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def del_dhcpv4(self): - conf_file = dhclient_conf_dir + self._ifname + '.conf' - pidfile = dhclient_conf_dir + self._ifname + '.pid' - leasefile = dhclient_conf_dir + self._ifname + '.leases' - if not os.path.exists(pidfile): - return 1 - try: - ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode() - return True - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def get_dhcpv4(self): - pidfile = dhclient_conf_dir + self._ifname + '.pid' - if not os.path.exists(pidfile): - print ("no dhcp client running on interface {0}".format(self._ifname)) - return False - else: - pid = open(pidfile, 'r').read() - print("dhclient running on {0} with pid {1}".format(self._ifname, pid)) - return True - - - def set_dhcpv6(self): - conf_file = dhclient_conf_dir + self._ifname + '.v6conf' - pidfile = dhclient_conf_dir + self._ifname + '.v6pid' - leasefile = dhclient_conf_dir + self._ifname + '.v6leases' - a = [ - '# generated by interface_config.py', - 'interface \"' + self._ifname + '\" {', - '\trequest routers, domain-name-servers, domain-name;', - '}' - ] - cnf = "" - for ln in a: - cnf +=str(ln + "\n") - open(conf_file, 'w').write(cnf) - subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self._ifname + '.accept_ra=0']) - if os.path.exists(pidfile): - try: - ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode() - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - try: - ret = subprocess.check_output(['/sbin/dhclient -6 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self._ifname], shell=True).decode() - return True - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def del_dhcpv6(self): - conf_file = dhclient_conf_dir + self._ifname + '.v6conf' - pidfile = dhclient_conf_dir + self._ifname + '.v6pid' - leasefile = dhclient_conf_dir + self._ifname + '.v6leases' - if not os.path.exists(pidfile): - return 1 - try: - ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode() - subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self._ifname + '.accept_ra=1']) - return True - except subprocess.CalledProcessError as e: - if self._debug(): - self._debug(e) - return None - - def get_dhcpv6(self): - pidfile = dhclient_conf_dir + self._ifname + '.v6pid' - if not os.path.exists(pidfile): - print ("no dhcpv6 client running on interface {0}".format(self._ifname)) - return False - else: - pid = open(pidfile, 'r').read() - print("dhclientv6 running on {0} with pid {1}".format(self._ifname, pid)) - return True - - -#### TODO: dhcpv6-pd via dhclient - |