diff options
-rw-r--r-- | python/vyos/remote.py | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/python/vyos/remote.py b/python/vyos/remote.py index 47af9d3a6..ad9706a82 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -48,7 +48,7 @@ def upload_sftp(local_path, hostname, remote_path,\ sftp.put(local_path, remote_path) def download_sftp(local_path, hostname, remote_path,\ - username, password=None, port=22): + username=None, password=None, port=22): with SSHClient() as ssh: ssh.load_system_host_keys() ssh.connect(hostname, port, username, password) @@ -63,15 +63,51 @@ def download_tftp(local_path, hostname, remote_path, port=69): with open(local_path, 'wb') as file: file.write(cmd(f'curl -s tftp://{hostname}:{port}/{remote_path}', stderr=None)) - def download_http(urlstring, local_path): with open(local_path, 'wb') as file: with urllib.request.urlopen(urlstring) as response: file.write(response.read()) -def get_remote_config(urlstring: str) -> bytes: - """Download remote (config) file and return the contents. +def download(local_path, urlstring): + """ + Dispatch the appropriate download function for the given URL and save to local path. + """ + url = urllib.parse.urlparse(urlstring) + if url.scheme == 'http' or url.scheme == 'https': + download_http(urlstring, local_path) + elif url.scheme == 'ftp': + username = url.username if url.username else 'anonymous' + download_ftp(local_path, url.hostname, url.path, username, url.password) + elif url.scheme == 'sftp' or url.scheme == 'scp': + # None means we don't want to use password authentication. + # An empty string (what urlparse returns when a password doesn't + # exist in the URL) means the password is an empty string. + password = url.password if url.password else None + download_sftp(local_path, url.hostname, url.path, url.username, password) + elif url.scheme == 'tftp': + download_tftp(local_path, url.hostname, url.path) + else: + ValueError(f'Unsupported URL scheme: {url.scheme}') +def upload(local_path, urlstring): + """ + Dispatch the appropriate upload function for the given URL and upload from local path. + """ + url = urllib.parse.urlparse(urlstring) + if url.scheme == 'ftp': + username = url.username if url.username else 'anonymous' + upload_ftp(local_path, url.hostname, url.path, username, url.password) + elif url.scheme == 'sftp' or url.scheme == 'scp': + password = url.password if url.password else None + upload_sftp(local_path, url.hostname, url.path, url.username, password) + elif url.scheme == 'tftp': + upload_tftp(local_path, url.hostname, url.path) + else: + ValueError(f'Unsupported URL scheme: {url.scheme}') + +def get_remote_config(urlstring): + """ + Download remote (config) file and return the contents. Args: remote file URI: scp://<user>[:<passwd>]@<host>/<file> @@ -84,23 +120,8 @@ def get_remote_config(urlstring: str) -> bytes: url = urllib.parse.urlparse(urlstring) temp = tempfile.NamedTemporaryFile(delete=False).name try: - if url.scheme == 'http' or url.scheme == 'https': - download_http(urlstring, temp) - elif url.scheme == 'ftp': - username = url.username if url.username else 'anonymous' - download_ftp(temp, url.hostname, url.path, username, url.password) - elif url.scheme == 'sftp' or url.scheme == 'scp': - # None means we don't want to use password authentication. - # An empty string (what urlparse returns when a password doesn't - # exist in the URL) means the password is an empty string. - password = url.password if url.password else None - download_sftp(temp, url.hostname, url.path, url.username, password) - elif url.scheme == 'tftp': - download_tftp(temp, url.path, url.hostname, url.path) - else: - sys.exit('Unsupported URL scheme') + download(temp, urlstring) with open(temp, 'r') as file: - config = file.read() - return config + return file.read() finally: os.remove(temp) |