summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/firewall.py24
-rw-r--r--python/vyos/ifconfig/bridge.py21
-rw-r--r--python/vyos/ifconfig/vti.py6
-rw-r--r--python/vyos/pki.py61
-rw-r--r--python/vyos/util.py4
5 files changed, 98 insertions, 18 deletions
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 7d1278d0e..3e2de4c3f 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -152,7 +152,10 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):
output.append(f'{ip_name} {prefix}addr {suffix}')
if dict_search_args(side_conf, 'geoip', 'country_code'):
- output.append(f'{ip_name} {prefix}addr @GEOIP_CC_{fw_name}_{rule_id}')
+ operator = ''
+ if dict_search_args(side_conf, 'geoip', 'inverse_match') != None:
+ operator = '!='
+ output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}')
if 'mac_address' in side_conf:
suffix = side_conf["mac_address"]
@@ -429,22 +432,13 @@ def geoip_update(firewall, force=False):
# Map country codes to set names
for codes, path in dict_search_recursive(firewall, 'country_code'):
+ set_name = f'GEOIP_CC_{path[1]}_{path[3]}'
if path[0] == 'name':
- set_name = f'GEOIP_CC_{path[1]}_{path[3]}'
- ipv4_sets[set_name] = []
for code in codes:
- if code not in ipv4_codes:
- ipv4_codes[code] = [set_name]
- else:
- ipv4_codes[code].append(set_n)
+ ipv4_codes.setdefault(code, []).append(set_name)
elif path[0] == 'ipv6_name':
- set_name = f'GEOIP_CC_{path[1]}_{path[3]}'
- ipv6_sets[set_name] = []
for code in codes:
- if code not in ipv6_codes:
- ipv6_codes[code] = [set_name]
- else:
- ipv6_codes[code].append(set_name)
+ ipv6_codes.setdefault(code, []).append(set_name)
if not ipv4_codes and not ipv6_codes:
if force:
@@ -459,11 +453,11 @@ def geoip_update(firewall, force=False):
if code in ipv4_codes and ipv4:
ip_range = f'{start}-{end}' if start != end else start
for setname in ipv4_codes[code]:
- ipv4_sets[setname].append(ip_range)
+ ipv4_sets.setdefault(setname, []).append(ip_range)
if code in ipv6_codes and not ipv4:
ip_range = f'{start}-{end}' if start != end else start
for setname in ipv6_codes[code]:
- ipv6_sets[setname].append(ip_range)
+ ipv6_sets.setdefault(setname, []).append(ip_range)
render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', {
'ipv4_sets': ipv4_sets,
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index ffd9c590f..e4db69c1f 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -90,6 +90,10 @@ class BridgeIf(Interface):
'validate': assert_boolean,
'location': '/sys/class/net/{ifname}/bridge/multicast_querier',
},
+ 'multicast_snooping': {
+ 'validate': assert_boolean,
+ 'location': '/sys/class/net/{ifname}/bridge/multicast_snooping',
+ },
}}
_command_set = {**Interface._command_set, **{
@@ -198,6 +202,18 @@ class BridgeIf(Interface):
"""
self.set_interface('multicast_querier', enable)
+ def set_multicast_snooping(self, enable):
+ """
+ Enable or disable multicast snooping on the bridge.
+
+ Use enable=1 to enable or enable=0 to disable
+
+ Example:
+ >>> from vyos.ifconfig import Interface
+ >>> BridgeIf('br0').set_multicast_snooping(1)
+ """
+ self.set_interface('multicast_snooping', enable)
+
def add_port(self, interface):
"""
Add physical interface to bridge (member port)
@@ -257,6 +273,11 @@ class BridgeIf(Interface):
value = '1' if 'stp' in config else '0'
self.set_stp(value)
+ # enable or disable multicast snooping
+ tmp = dict_search('igmp.snooping', config)
+ value = '1' if (tmp != None) else '0'
+ self.set_multicast_snooping(value)
+
# enable or disable IGMP querier
tmp = dict_search('igmp.querier', config)
value = '1' if (tmp != None) else '0'
diff --git a/python/vyos/ifconfig/vti.py b/python/vyos/ifconfig/vti.py
index c50cd5ce9..dc99d365a 100644
--- a/python/vyos/ifconfig/vti.py
+++ b/python/vyos/ifconfig/vti.py
@@ -1,4 +1,4 @@
-# Copyright 2021 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2021-2022 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -53,3 +53,7 @@ class VTIIf(Interface):
self._cmd(cmd.format(**self.config))
self.set_interface('admin_state', 'down')
+
+ def get_mac(self):
+ """ Get a synthetic MAC address. """
+ return self.get_mac_synthetic()
diff --git a/python/vyos/pki.py b/python/vyos/pki.py
index fd91fc9bf..cd15e3878 100644
--- a/python/vyos/pki.py
+++ b/python/vyos/pki.py
@@ -332,6 +332,54 @@ def verify_certificate(cert, ca_cert):
except InvalidSignature:
return False
+def verify_crl(crl, ca_cert):
+ # Verify CRL was signed by specified CA
+ if ca_cert.subject != crl.issuer:
+ return False
+
+ ca_public_key = ca_cert.public_key()
+ try:
+ if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization):
+ ca_public_key.verify(
+ crl.signature,
+ crl.tbs_certlist_bytes,
+ padding=padding.PKCS1v15(),
+ algorithm=crl.signature_hash_algorithm)
+ elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization):
+ ca_public_key.verify(
+ crl.signature,
+ crl.tbs_certlist_bytes,
+ algorithm=crl.signature_hash_algorithm)
+ elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization):
+ ca_public_key.verify(
+ crl.signature,
+ crl.tbs_certlist_bytes,
+ signature_algorithm=ec.ECDSA(crl.signature_hash_algorithm))
+ else:
+ return False # We cannot verify it
+ return True
+ except InvalidSignature:
+ return False
+
+def verify_ca_chain(sorted_names, pki_node):
+ if len(sorted_names) == 1: # Single cert, no chain
+ return True
+
+ for name in sorted_names:
+ cert = load_certificate(pki_node[name]['certificate'])
+ verified = False
+ for ca_name in sorted_names:
+ if name == ca_name:
+ continue
+ ca_cert = load_certificate(pki_node[ca_name]['certificate'])
+ if verify_certificate(cert, ca_cert):
+ verified = True
+ break
+ if not verified and name != sorted_names[-1]:
+ # Only permit top-most certificate to fail verify (e.g. signed by public CA not explicitly in chain)
+ return False
+ return True
+
# Certificate chain
def find_parent(cert, ca_certs):
@@ -357,3 +405,16 @@ def find_chain(cert, ca_certs):
chain.append(parent)
return chain
+
+def sort_ca_chain(ca_names, pki_node):
+ def ca_cmp(ca_name1, ca_name2, pki_node):
+ cert1 = load_certificate(pki_node[ca_name1]['certificate'])
+ cert2 = load_certificate(pki_node[ca_name2]['certificate'])
+
+ if verify_certificate(cert1, cert2): # cert1 is child of cert2
+ return -1
+ return 1
+
+ from functools import cmp_to_key
+ return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node)))
+
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 0d62fbfe9..bee5d7aec 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -197,7 +197,7 @@ def read_file(fname, defaultonfailure=None):
return defaultonfailure
raise e
-def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None):
+def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False):
"""
Write content of data to given fname, should defaultonfailure be not None,
it is returned on failure to read.
@@ -212,7 +212,7 @@ def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=N
try:
""" Write a file to string """
bytes = 0
- with open(fname, 'w') as f:
+ with open(fname, 'w' if not append else 'a') as f:
bytes = f.write(data)
chown(fname, user, group)
chmod(fname, mode)