diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/firewall.py | 24 | ||||
-rw-r--r-- | python/vyos/ifconfig/bridge.py | 21 | ||||
-rw-r--r-- | python/vyos/ifconfig/vti.py | 6 | ||||
-rw-r--r-- | python/vyos/pki.py | 61 | ||||
-rw-r--r-- | python/vyos/util.py | 4 |
5 files changed, 98 insertions, 18 deletions
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index 7d1278d0e..3e2de4c3f 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -152,7 +152,10 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name): output.append(f'{ip_name} {prefix}addr {suffix}') if dict_search_args(side_conf, 'geoip', 'country_code'): - output.append(f'{ip_name} {prefix}addr @GEOIP_CC_{fw_name}_{rule_id}') + operator = '' + if dict_search_args(side_conf, 'geoip', 'inverse_match') != None: + operator = '!=' + output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}') if 'mac_address' in side_conf: suffix = side_conf["mac_address"] @@ -429,22 +432,13 @@ def geoip_update(firewall, force=False): # Map country codes to set names for codes, path in dict_search_recursive(firewall, 'country_code'): + set_name = f'GEOIP_CC_{path[1]}_{path[3]}' if path[0] == 'name': - set_name = f'GEOIP_CC_{path[1]}_{path[3]}' - ipv4_sets[set_name] = [] for code in codes: - if code not in ipv4_codes: - ipv4_codes[code] = [set_name] - else: - ipv4_codes[code].append(set_n) + ipv4_codes.setdefault(code, []).append(set_name) elif path[0] == 'ipv6_name': - set_name = f'GEOIP_CC_{path[1]}_{path[3]}' - ipv6_sets[set_name] = [] for code in codes: - if code not in ipv6_codes: - ipv6_codes[code] = [set_name] - else: - ipv6_codes[code].append(set_name) + ipv6_codes.setdefault(code, []).append(set_name) if not ipv4_codes and not ipv6_codes: if force: @@ -459,11 +453,11 @@ def geoip_update(firewall, force=False): if code in ipv4_codes and ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv4_codes[code]: - ipv4_sets[setname].append(ip_range) + ipv4_sets.setdefault(setname, []).append(ip_range) if code in ipv6_codes and not ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv6_codes[code]: - ipv6_sets[setname].append(ip_range) + ipv6_sets.setdefault(setname, []).append(ip_range) render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', { 'ipv4_sets': ipv4_sets, diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py index ffd9c590f..e4db69c1f 100644 --- a/python/vyos/ifconfig/bridge.py +++ b/python/vyos/ifconfig/bridge.py @@ -90,6 +90,10 @@ class BridgeIf(Interface): 'validate': assert_boolean, 'location': '/sys/class/net/{ifname}/bridge/multicast_querier', }, + 'multicast_snooping': { + 'validate': assert_boolean, + 'location': '/sys/class/net/{ifname}/bridge/multicast_snooping', + }, }} _command_set = {**Interface._command_set, **{ @@ -198,6 +202,18 @@ class BridgeIf(Interface): """ self.set_interface('multicast_querier', enable) + def set_multicast_snooping(self, enable): + """ + Enable or disable multicast snooping on the bridge. + + Use enable=1 to enable or enable=0 to disable + + Example: + >>> from vyos.ifconfig import Interface + >>> BridgeIf('br0').set_multicast_snooping(1) + """ + self.set_interface('multicast_snooping', enable) + def add_port(self, interface): """ Add physical interface to bridge (member port) @@ -257,6 +273,11 @@ class BridgeIf(Interface): value = '1' if 'stp' in config else '0' self.set_stp(value) + # enable or disable multicast snooping + tmp = dict_search('igmp.snooping', config) + value = '1' if (tmp != None) else '0' + self.set_multicast_snooping(value) + # enable or disable IGMP querier tmp = dict_search('igmp.querier', config) value = '1' if (tmp != None) else '0' diff --git a/python/vyos/ifconfig/vti.py b/python/vyos/ifconfig/vti.py index c50cd5ce9..dc99d365a 100644 --- a/python/vyos/ifconfig/vti.py +++ b/python/vyos/ifconfig/vti.py @@ -1,4 +1,4 @@ -# Copyright 2021 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2021-2022 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -53,3 +53,7 @@ class VTIIf(Interface): self._cmd(cmd.format(**self.config)) self.set_interface('admin_state', 'down') + + def get_mac(self): + """ Get a synthetic MAC address. """ + return self.get_mac_synthetic() diff --git a/python/vyos/pki.py b/python/vyos/pki.py index fd91fc9bf..cd15e3878 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -332,6 +332,54 @@ def verify_certificate(cert, ca_cert): except InvalidSignature: return False +def verify_crl(crl, ca_cert): + # Verify CRL was signed by specified CA + if ca_cert.subject != crl.issuer: + return False + + ca_public_key = ca_cert.public_key() + try: + if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization): + ca_public_key.verify( + crl.signature, + crl.tbs_certlist_bytes, + padding=padding.PKCS1v15(), + algorithm=crl.signature_hash_algorithm) + elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization): + ca_public_key.verify( + crl.signature, + crl.tbs_certlist_bytes, + algorithm=crl.signature_hash_algorithm) + elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization): + ca_public_key.verify( + crl.signature, + crl.tbs_certlist_bytes, + signature_algorithm=ec.ECDSA(crl.signature_hash_algorithm)) + else: + return False # We cannot verify it + return True + except InvalidSignature: + return False + +def verify_ca_chain(sorted_names, pki_node): + if len(sorted_names) == 1: # Single cert, no chain + return True + + for name in sorted_names: + cert = load_certificate(pki_node[name]['certificate']) + verified = False + for ca_name in sorted_names: + if name == ca_name: + continue + ca_cert = load_certificate(pki_node[ca_name]['certificate']) + if verify_certificate(cert, ca_cert): + verified = True + break + if not verified and name != sorted_names[-1]: + # Only permit top-most certificate to fail verify (e.g. signed by public CA not explicitly in chain) + return False + return True + # Certificate chain def find_parent(cert, ca_certs): @@ -357,3 +405,16 @@ def find_chain(cert, ca_certs): chain.append(parent) return chain + +def sort_ca_chain(ca_names, pki_node): + def ca_cmp(ca_name1, ca_name2, pki_node): + cert1 = load_certificate(pki_node[ca_name1]['certificate']) + cert2 = load_certificate(pki_node[ca_name2]['certificate']) + + if verify_certificate(cert1, cert2): # cert1 is child of cert2 + return -1 + return 1 + + from functools import cmp_to_key + return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node))) + diff --git a/python/vyos/util.py b/python/vyos/util.py index 0d62fbfe9..bee5d7aec 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -197,7 +197,7 @@ def read_file(fname, defaultonfailure=None): return defaultonfailure raise e -def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None): +def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False): """ Write content of data to given fname, should defaultonfailure be not None, it is returned on failure to read. @@ -212,7 +212,7 @@ def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=N try: """ Write a file to string """ bytes = 0 - with open(fname, 'w') as f: + with open(fname, 'w' if not append else 'a') as f: bytes = f.write(data) chown(fname, user, group) chmod(fname, mode) |