summaryrefslogtreecommitdiff
path: root/python/vyos/remote.py
blob: 0c0de8e0f0f6d9c545e9261655799b598e0d2715 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright 2021 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import tempfile
from ftplib import FTP
import urllib.parse
import urllib.request

from vyos.util import cmd
from paramiko import SSHClient

def upload_ftp(local_path, hostname, remote_path,\
               username='anonymous', password='', port=21):
    with open(local_path, 'rb') as file:
        with FTP() as conn:
            conn.connect(hostname, port)
            conn.login(username, password)
            conn.storbinary(f'STOR {remote_path}', file)

def download_ftp(local_path, hostname, remote_path,\
                 username='anonymous', password='', port=21):
    with open(local_path, 'wb') as file:
        with FTP() as conn:
            conn.connect(hostname, port)
            conn.login(username, password)
            conn.retrbinary(f'RETR {remote_path}', file.write)

def upload_sftp(local_path, hostname, remote_path,\
                username=None, password=None, port=22):
    with SSHClient() as ssh:
        ssh.load_system_host_keys()
        ssh.connect(hostname, port, username, password)
        with ssh.open_sftp() as sftp:
            sftp.put(local_path, remote_path)

def download_sftp(local_path, hostname, remote_path,\
                  username=None, password=None, port=22):
    with SSHClient() as ssh:
        ssh.load_system_host_keys()
        ssh.connect(hostname, port, username, password)
        with ssh.open_sftp() as sftp:
            sftp.get(remote_path, local_path)

def upload_tftp(local_path, hostname, remote_path, port=69):
    with open(local_path, 'rb') as file:
        cmd(f'curl -s -T - tftp://{hostname}:{port}/{remote_path}', stderr=None, input=file.read()).encode()

def download_tftp(local_path, hostname, remote_path, port=69):
    with open(local_path, 'wb') as file:
        file.write(cmd(f'curl -s tftp://{hostname}:{port}/{remote_path}', stderr=None).encode())

def download_http(urlstring, local_path):
    with open(local_path, 'wb') as file:
        with urllib.request.urlopen(urlstring) as response:
            file.write(response.read())

def download(local_path, urlstring):
    """
    Dispatch the appropriate download function for the given URL and save to local path.
    """
    url = urllib.parse.urlparse(urlstring)
    if url.scheme == 'http' or url.scheme == 'https':
        download_http(urlstring, local_path)
    elif url.scheme == 'ftp':
        username = url.username if url.username else 'anonymous'
        download_ftp(local_path, url.hostname, url.path, username, url.password)
    elif url.scheme == 'sftp' or url.scheme == 'scp':
        # None means we don't want to use password authentication.
        # An empty string (what urlparse returns when a password doesn't
        # exist in the URL) means the password is an empty string.
        password = url.password if url.password else None
        download_sftp(local_path, url.hostname, url.path, url.username, password)
    elif url.scheme == 'tftp':
        download_tftp(local_path, url.hostname, url.path)
    else:
        ValueError(f'Unsupported URL scheme: {url.scheme}')

def upload(local_path, urlstring):
    """
    Dispatch the appropriate upload function for the given URL and upload from local path.
    """
    url = urllib.parse.urlparse(urlstring)
    if url.scheme == 'ftp':
        username = url.username if url.username else 'anonymous'
        upload_ftp(local_path, url.hostname, url.path, username, url.password)
    elif url.scheme == 'sftp' or url.scheme == 'scp':
        password = url.password if url.password else None
        upload_sftp(local_path, url.hostname, url.path, url.username, password)
    elif url.scheme == 'tftp':
        upload_tftp(local_path, url.hostname, url.path)
    else:
        ValueError(f'Unsupported URL scheme: {url.scheme}')

def get_remote_config(urlstring):
    """
    Download remote (config) file and return the contents.
        Args:
            remote file URI:
                scp://<user>[:<passwd>]@<host>/<file>
                sftp://<user>[:<passwd>]@<host>/<file>
                http://<host>/<file>
                https://<host>/<file>
                ftp://[<user>[:<passwd>]@]<host>/<file>
                tftp://<host>/<file>
    """
    url = urllib.parse.urlparse(urlstring)
    temp = tempfile.NamedTemporaryFile(delete=False).name
    try:
        download(temp, urlstring)
        with open(temp, 'r') as file:
            return file.read()
    finally:
        os.remove(temp)