#!/usr/bin/env python3 # # Copyright (C) 2019-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import unittest import tempfile import random import string from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.process import process_running DDCLIENT_SYSTEMD_UNIT = '/run/systemd/system/ddclient.service.d/override.conf' DDCLIENT_CONF = '/run/ddclient/ddclient.conf' DDCLIENT_PID = '/run/ddclient/ddclient.pid' DDCLIENT_PNAME = 'ddclient' base_path = ['service', 'dns', 'dynamic'] name_path = base_path + ['name'] server = 'ddns.vyos.io' hostname = 'test.ddns.vyos.io' zone = 'vyos.io' username = 'vyos_user' password = 'paSS_@4ord' ttl = '300' interface = 'eth0' class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def setUp(self): # Always start with a clean CLI instance self.cli_delete(base_path) def tearDown(self): # Check for running process self.assertTrue(process_running(DDCLIENT_PID)) # Delete DDNS configuration self.cli_delete(base_path) self.cli_commit() # PID file must no londer exist after process exited self.assertFalse(os.path.exists(DDCLIENT_PID)) # IPv4 standard DDNS service configuration def test_01_dyndns_service_standard(self): services = {'cloudflare': {'protocol': 'cloudflare'}, 'freedns': {'protocol': 'freedns', 'username': username}, 'zoneedit': {'protocol': 'zoneedit1', 'username': username}} for svc, details in services.items(): self.cli_set(name_path + [svc, 'address', interface]) self.cli_set(name_path + [svc, 'host-name', hostname]) self.cli_set(name_path + [svc, 'password', password]) self.cli_set(name_path + [svc, 'zone', zone]) self.cli_set(name_path + [svc, 'ttl', ttl]) for opt, value in details.items(): self.cli_set(name_path + [svc, opt, value]) # 'zone' option is supported and required by 'cloudfare', but not 'freedns' and 'zoneedit' self.cli_set(name_path + [svc, 'zone', zone]) if details['protocol'] == 'cloudflare': pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [svc, 'zone']) # 'ttl' option is supported by 'cloudfare', but not 'freedns' and 'zoneedit' self.cli_set(name_path + [svc, 'ttl', ttl]) if details['protocol'] == 'cloudflare': pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [svc, 'ttl']) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') # default value 300 seconds self.assertIn(f'daemon=300', ddclient_conf) self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) for opt in details.keys(): if opt == 'username': login = details[opt] self.assertIn(f'login={login}', ddclient_conf) else: tmp = details[opt] self.assertIn(f'{opt}={tmp}', ddclient_conf) # IPv6 only DDNS service configuration def test_02_dyndns_service_ipv6(self): interval = '60' svc_path = name_path + ['dynv6'] proto = 'dyndns2' ip_version = 'ipv6' wait_time = '600' expiry_time_good = '3600' expiry_time_bad = '360' self.cli_set(base_path + ['interval', interval]) self.cli_set(svc_path + ['address', interface]) self.cli_set(svc_path + ['ip-version', ip_version]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['username', username]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', hostname]) self.cli_set(svc_path + ['wait-time', wait_time]) # expiry-time must be greater than wait-time, exception is raised otherwise with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['expiry-time', expiry_time_bad]) self.cli_commit() self.cli_set(svc_path + ['expiry-time', expiry_time_good]) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'daemon={interval}', ddclient_conf) self.assertIn(f'usev6=ifv6', ddclient_conf) self.assertIn(f'ifv6={interface}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'login={username}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'min-interval={wait_time}', ddclient_conf) self.assertIn(f'max-interval={expiry_time_good}', ddclient_conf) # IPv4+IPv6 dual DDNS service configuration def test_03_dyndns_service_dual_stack(self): services = {'cloudflare': {'protocol': 'cloudflare', 'zone': zone}, 'freedns': {'protocol': 'freedns', 'username': username}, 'google': {'protocol': 'googledomains', 'username': username}} ip_version = 'both' for name, details in services.items(): self.cli_set(name_path + [name, 'address', interface]) self.cli_set(name_path + [name, 'host-name', hostname]) self.cli_set(name_path + [name, 'password', password]) for opt, value in details.items(): self.cli_set(name_path + [name, opt, value]) # Dual stack is supported by 'cloudfare' and 'freedns' but not 'googledomains' # exception is raised for unsupported ones self.cli_set(name_path + [name, 'ip-version', ip_version]) if details['protocol'] not in ['cloudflare', 'freedns']: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [name, 'ip-version']) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') if details['protocol'] not in ['cloudflare', 'freedns']: self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) else: self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'usev6=ifv6', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) self.assertIn(f'ifv6={interface}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) for opt in details.keys(): if opt == 'username': login = details[opt] self.assertIn(f'login={login}', ddclient_conf) else: tmp = details[opt] self.assertIn(f'{opt}={tmp}', ddclient_conf) def test_04_dyndns_rfc2136(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['vyos'] proto = 'nsupdate' with tempfile.NamedTemporaryFile(prefix='/config/auth/') as key_file: key_file.write(b'S3cretKey') self.cli_set(svc_path + ['address', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['key', key_file.name]) self.cli_set(svc_path + ['ttl', ttl]) self.cli_set(svc_path + ['host-name', hostname]) # commit changes self.cli_commit() # Check some generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'use=if', ddclient_conf) self.assertIn(f'if={interface}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'zone={zone}', ddclient_conf) self.assertIn(f'password=\'{key_file.name}\'', ddclient_conf) self.assertIn(f'ttl={ttl}', ddclient_conf) def test_05_dyndns_hostname(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['namecheap'] proto = 'namecheap' hostnames = ['@', 'www', hostname, f'@.{hostname}'] for name in hostnames: self.cli_set(svc_path + ['address', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['username', username]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', name]) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'login={username}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'{name}', ddclient_conf) def test_06_dyndns_web_options(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['cloudflare'] proto = 'cloudflare' web_url_good = 'https://ifconfig.me/ip' web_url_bad = 'http:/ifconfig.me/ip' self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', hostname]) self.cli_set(svc_path + ['web-options', 'url', web_url_good]) # web-options is supported only with web service based address lookup # exception is raised for interface based address lookup with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['address', interface]) self.cli_commit() self.cli_set(svc_path + ['address', 'web']) # commit changes self.cli_commit() # web-options must be a valid URL with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['web-options', 'url', web_url_bad]) self.cli_commit() self.cli_set(svc_path + ['web-options', 'url', web_url_good]) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'usev4=webv4', ddclient_conf) self.assertIn(f'webv4={web_url_good}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'zone={zone}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'{hostname}', ddclient_conf) def test_07_dyndns_vrf(self): # Table number randomized, but should be within range 100-65535 vrf_table = "".join(random.choices(string.digits, k=4)) vrf_name = f'vyos-test-{vrf_table}' svc_path = name_path + ['cloudflare'] proto = 'cloudflare' self.cli_set(['vrf', 'name', vrf_name, 'table', vrf_table]) self.cli_set(base_path + ['vrf', vrf_name]) self.cli_set(svc_path + ['address', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['host-name', hostname]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['password', password]) # commit changes self.cli_commit() # Check for process in VRF systemd_override = cmd(f'cat {DDCLIENT_SYSTEMD_UNIT}') self.assertIn(f'ExecStart=ip vrf exec {vrf_name} /usr/bin/ddclient -file {DDCLIENT_CONF}', systemd_override) # Check for process in VRF proc = cmd(f'ip vrf pids {vrf_name}') self.assertIn(DDCLIENT_PNAME, proc) # Cleanup VRF self.cli_delete(['vrf', 'name', vrf_name]) if __name__ == '__main__': unittest.main(verbosity=2)