#!/usr/bin/python3 # Copyright 2019 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . import sys import os import json import socket import subprocess dhclient_conf_dir = r'/var/lib/dhcp/dhclient_' class Interface(): def __init__(self, ifname=None, type=None): if not ifname: raise Exception("interface name required") if not os.path.exists('/sys/class/net/{0}'.format(ifname)) and not type: raise Exception("interface {0} not found".format(str(ifname))) else: if not os.path.exists('/sys/class/net/{0}'.format(ifname)): try: ret = subprocess.check_output(['ip link add dev ' + str(ifname) + ' type ' + type], stderr=subprocess.STDOUT, shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) if "Operation not supported" in str(e.output.decode()): print(str(e.output.decode())) sys.exit(0) self.ifname = str(ifname) def _debug(self, e=None): """ export DEBUG=1 to see debug messages """ if os.getenv('DEBUG') == '1': if e: print ("Exception raised:\ncommand: {0}\nerror code: {1}\nsubprocess output: {2}".format(e.cmd, e.returncode, e.output.decode()) ) return True return False def set_alias(self, alias=None): if not alias: open('/sys/class/net/{0}/ifalias'.format(self.ifname),'w').write(self.ifname) else: open('/sys/class/net/{0}/ifalias'.format(self.ifname),'w').write(str(alias)) def get_alias(self): return open('/sys/class/net/{0}/ifalias'.format(self.ifname),'r').read() def del_alias(self): open('/sys/class/net/{0}/ifalias'.format(self.ifname),'w').write() def set_link_state(self, state="up"): if state.lower() == 'up' or state.lower() == 'down': try: ret = subprocess.check_output(['ip link set dev ' + self.ifname + ' ' + state], shell=True).decode() return 0 except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return 1 else: raise ValueError("value can only be up or down") def get_link_state(self): """ returns either up/down or None if it can't find the state """ try: ret = subprocess.check_output(['ip -j link show ' + self.ifname], shell=True).decode() s = json.loads(ret) return s[0]['operstate'].lower() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def remove_interface(self): try: ret = subprocess.check_output(['ip link del dev ' + self.ifname], shell=True).decode() return 0 except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def set_macaddr(self, mac=None): # ip will give us an error if the mac is invalid try: ret = subprocess.check_output(['ip link set address ' + mac + ' ' + self.ifname], shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def get_macaddr(self): try: ret = subprocess.check_output(['ip -j -4 link show dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() j = json.loads(ret) return j[0]['address'] except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def get_ipv4_addr(self): """ reads all IPs assigned to an interface and returns it in a list, or None if no IP address is assigned to the interface """ ips = [] try: ret = subprocess.check_output(['ip -j -4 addr show dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() j = json.loads(ret) for i in j: if len(i) != 0: for addr in i['addr_info']: ips.append(addr['local']) return ips except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def get_ipv6_addr(self): """ reads all IPs assigned to an interface and returns it in a list, or None if no IP address is assigned to the interface """ ips = [] try: ret = subprocess.check_output(['ip -j -6 addr show dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() j = json.loads(ret) for i in j: if len(i) != 0: for addr in i['addr_info']: ips.append(addr['local']) return ips except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def add_ipv4_addr(self, ipaddr=[]): """ add addresses on the interface """ for ip in ipaddr: try: ret = subprocess.check_output(['ip -4 address add ' + ip + ' dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None return True def del_ipv4_addr(self, ipaddr=[]): """ delete addresses on the interface """ for ip in ipaddr: try: ret = subprocess.check_output(['ip -4 address del ' + ip + ' dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None return True def add_ipv6_addr(self, ipaddr=[]): """ add addresses on the interface """ for ip in ipaddr: try: ret = subprocess.check_output(['ip -6 address add ' + ip + ' dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None return True def del_ipv6_addr(self, ipaddr=[]): """ delete addresses on the interface """ for ip in ipaddr: try: ret = subprocess.check_output(['ip -6 address del ' + ip + ' dev ' + self.ifname], stderr=subprocess.STDOUT, shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None return True def get_mtu(self): try: ret = subprocess.check_output(['ip -j link list dev ' + self.ifname], shell=True).decode() a = json.loads(ret)[0] return a['mtu'] except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def set_mtu(self, mtu=None): if not mtu: return None try: ret = subprocess.check_output(['ip link set mtu ' + str(mtu) + ' dev ' + self.ifname], shell=True).decode() return True except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None #### replace dhcpv4/v6 with systemd.networkd? def set_dhcpv4(self): conf_file = dhclient_conf_dir + self.ifname + '.conf' pidfile = dhclient_conf_dir + self.ifname + '.pid' leasefile = dhclient_conf_dir + self.ifname + '.leases' a = [ '# generated by interface_config.py', 'option rfc3442-classless-static-routes code 121 = array of unsigned integer 8;', 'interface \"' + self.ifname + '\" {', '\tsend host-name \"' + socket.gethostname() +'\";', '\trequest subnet-mask, broadcast-address, routers, domain-name-servers, rfc3442-classless-static-routes, domain-name, interface-mtu;', '}' ] cnf = "" for ln in a: cnf +=str(ln + "\n") open(conf_file, 'w').write(cnf) if os.path.exists(dhclient_conf_dir + self.ifname + '.pid'): try: ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) try: ret = subprocess.check_output(['/sbin/dhclient -4 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self.ifname], shell=True).decode() return True except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def del_dhcpv4(self): conf_file = dhclient_conf_dir + self.ifname + '.conf' pidfile = dhclient_conf_dir + self.ifname + '.pid' leasefile = dhclient_conf_dir + self.ifname + '.leases' if not os.path.exists(pidfile): return 1 try: ret = subprocess.check_output(['/sbin/dhclient -4 -r -pf ' + pidfile], shell=True).decode() return True except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def get_dhcpv4(self): pidfile = dhclient_conf_dir + self.ifname + '.pid' if not os.path.exists(pidfile): print ("no dhcp client running on interface {0}".format(self.ifname)) return False else: pid = open(pidfile, 'r').read() print("dhclient running on {0} with pid {1}".format(self.ifname, pid)) return True def set_dhcpv6(self): conf_file = dhclient_conf_dir + self.ifname + '.v6conf' pidfile = dhclient_conf_dir + self.ifname + '.v6pid' leasefile = dhclient_conf_dir + self.ifname + '.v6leases' a = [ '# generated by interface_config.py', 'interface \"' + self.ifname + '\" {', '\trequest routers, domain-name-servers, domain-name;', '}' ] cnf = "" for ln in a: cnf +=str(ln + "\n") open(conf_file, 'w').write(cnf) subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self.ifname + '.accept_ra=0']) if os.path.exists(pidfile): try: ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode() except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) try: ret = subprocess.check_output(['/sbin/dhclient -6 -q -nw -cf ' + conf_file + ' -pf ' + pidfile + ' -lf ' + leasefile + ' ' + self.ifname], shell=True).decode() return True except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def del_dhcpv6(self): conf_file = dhclient_conf_dir + self.ifname + '.v6conf' pidfile = dhclient_conf_dir + self.ifname + '.v6pid' leasefile = dhclient_conf_dir + self.ifname + '.v6leases' if not os.path.exists(pidfile): return 1 try: ret = subprocess.check_output(['/sbin/dhclient -6 -q -x -pf ' + pidfile], shell=True).decode() subprocess.call(['sysctl', '-q', '-w', 'net.ipv6.conf.' + self.ifname + '.accept_ra=1']) return True except subprocess.CalledProcessError as e: if self._debug(): self._debug(e) return None def get_dhcpv6(self): pidfile = dhclient_conf_dir + self.ifname + '.v6pid' if not os.path.exists(pidfile): print ("no dhcpv6 client running on interface {0}".format(self.ifname)) return False else: pid = open(pidfile, 'r').read() print("dhclientv6 running on {0} with pid {1}".format(self.ifname, pid)) return True #### TODO: dhcpv6-pd via dhclient