diff options
Diffstat (limited to 'src/op_mode/file.py')
| -rwxr-xr-x | src/op_mode/file.py | 383 | 
1 files changed, 383 insertions, 0 deletions
| diff --git a/src/op_mode/file.py b/src/op_mode/file.py new file mode 100755 index 000000000..bf13bed6f --- /dev/null +++ b/src/op_mode/file.py @@ -0,0 +1,383 @@ +#!/usr/bin/python3 + +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library.  If not, see <http://www.gnu.org/licenses/>. + +import argparse +import contextlib +import datetime +import grp +import os +import pwd +import shutil +import sys +import tempfile + +from vyos.remote import download +from vyos.remote import upload +from vyos.utils.io import ask_yes_no +from vyos.utils.io import print_error +from vyos.utils.process import cmd +from vyos.utils.process import run + + +parser = argparse.ArgumentParser(description='view, copy or remove files and directories', +                                 formatter_class=argparse.RawDescriptionHelpFormatter) +parser.epilog = """ +TYPE is one of 'remote', 'image' and 'local'. +A local path is <path> or ~/<path>. +A remote path is <scheme>://<urn>. +An image path is <image>:<path>. + +Clone operation is between images only. +Copy operation does not support directories from remote locations. +Delete operation does not support remote paths. +""" +operations = parser.add_mutually_exclusive_group(required=True) +operations.add_argument('--show', nargs=1, help='show the contents of file PATH of type TYPE', +                        metavar=('PATH')) +operations.add_argument('--copy', nargs=2, help='copy SRC to DEST', +                        metavar=('SRC', 'DEST')) +operations.add_argument('--delete', nargs=1, help='delete file PATH', +                        metavar=('PATH')) +operations.add_argument('--clone', help='clone config from running image to IMG', +                        metavar='IMG') +operations.add_argument('--clone-from', nargs=2, help='clone config from image SRC to image DEST', +                        metavar=('SRC', 'DEST')) + +## Helper procedures +def fix_terminal() -> None: +    """ +    Reset terminal after potential breakage caused by abrupt exits. +    """ +    run('stty sane') + +def get_types(arg: str) -> tuple[str, str]: +    """ +    Determine whether the argument shows a local, image or remote path. +    """ +    schemes = ['http', 'https', 'ftp', 'ftps', 'sftp', 'ssh', 'scp', 'tftp'] +    s = arg.split("://", 1) +    if len(s) != 2: +        return 'local', arg +    elif s[0] in schemes: +        return 'remote', arg +    else: +        return 'image', arg + +def zealous_copy(source: str, destination: str) -> None: +    # Even shutil.copy2() doesn't preserve ownership across copies. +    # So we need to resort to this. +    stats = os.stat(source) +    shutil.copy2(source, destination) +    os.chown(destination, stats.st_uid, stats.st_gid) + +def get_file_type(path: str) -> str: +    return cmd(['file', '-sb', path]) + +def print_header(string: str) -> None: +    print('#' * 10, string, '#' * 10) + +def octal_to_symbolic(octal: str) -> str: +    perms = ['---', '--x', '-w-', '-wx', 'r--', 'r-x', 'rw-', 'rwx'] +    result = "" +    # We discard all but the last three digits because we're only +    # interested in the permission bits. +    for i in octal[-3:]: +        result += perms[int(i)] +    return result + +def get_user_and_group(stats: os.stat_result) -> tuple[str, str]: +    try: +        user = pwd.getpwuid(stats.st_uid).pw_name +    except (KeyError, PermissionError): +        user = str(stats.st_uid) +    try: +        group = grp.getgrgid(stats.st_gid).gr_name +    except (KeyError, PermissionError): +        group = str(stats.st_gid) +    return user, group + +def print_file_info(path: str) -> None: +    stats = os.stat(path) +    username, groupname = get_user_and_group(stats) +    mtime = datetime.datetime.fromtimestamp(stats.st_mtime).strftime("%F %X") +    print_header('FILE INFO') +    print(f'Path:\t\t{path}') +    # File type is determined through `file(1)`. +    print(f'Type:\t\t{get_file_type(path)}') +    # Owner user and group +    print(f'Owner:\t\t{username}:{groupname}') +    # Permissions are converted from raw int to octal string to symbolic string. +    print(f'Permissions:\t{octal_to_symbolic(oct(stats.st_mode))}') +    # Last date of modification +    print(f'Modified:\t{mtime}') + +def print_file_data(path: str) -> None: +    print_header('FILE DATA') +    file_type = get_file_type(path) +    # Human-readable files are streamed line-by-line. +    if 'text' in file_type: +        with open(path, 'r') as f: +            for line in f: +                print(line, end='') +    # tcpdump files go to TShark. +    elif 'pcap' in file_type or os.path.splitext(path)[1] == '.pcap': +        print(cmd(['sudo', 'tshark', '-r', path])) +    # All other binaries get hexdumped. +    else: +        print(cmd(['hexdump', '-C', path])) + +def parse_image_path(image_path: str) -> str: +    """ +    my-image:/foo/bar -> /lib/live/mount/persistence/boot/my-image/rw/foo/bar +    """ +    image_name, path = image_path.split('://', 1) +    if image_name == 'running': +        image_root = '/' +    elif image_name == 'disk-install': +        image_root = '/lib/live/mount/persistence/' +    else: +        image_root = os.path.join('/lib/live/mount/persistence/boot', image_name, 'rw') +        if not os.path.isdir(image_root): +            print_error(f'Image {image_name} not found.') +            sys.exit(1) +    return os.path.join(image_root, path) + + +## Show procedures +def show_locally(path: str) -> None: +    """ +    Display the contents of a local file or directory. +    """ +    location = os.path.realpath(os.path.expanduser(path)) +    # Temporarily redirect stdout to a throwaway file for `less(1)` to read. +    # The output could be potentially too hefty for an in-memory StringIO. +    temp = tempfile.NamedTemporaryFile('w', delete=False) +    try: +        with contextlib.redirect_stdout(temp): +            # Just a directory. Call `ls(1)` and bail. +            if os.path.isdir(location): +                print_header('DIRECTORY LISTING') +                print('Path:\t', location) +                print(cmd(['ls', '-hlFGL', '--group-directories-first', location])) +            elif os.path.isfile(location): +                print_file_info(location) +                print() +                print_file_data(location) +            else: +                print_error(f'File or directory {path} not found.') +                sys.exit(1) +            sys.stdout.flush() +        # Call `less(1)` and wait for it to terminate before going forward. +        cmd(['/usr/bin/less', '-X', temp.name], stdout=sys.stdout) +    # The stream to the temporary file could break for any reason. +    # It's much less fragile than if we streamed directly to the process stdin. +    # But anything could still happen and we don't want to scare the user. +    except (BrokenPipeError, EOFError, KeyboardInterrupt, OSError): +        fix_terminal() +        sys.exit(1) +    finally: +        os.remove(temp.name) + +def show(type: str, path: str) -> None: +    if type == 'remote': +        temp = tempfile.NamedTemporaryFile(delete=False) +        download(temp.name, path) +        show_locally(temp.name) +        os.remove(temp.name) +    elif type == 'image': +        show_locally(parse_image_path(path)) +    elif type == 'local': +        show_locally(path) +    else: +        print_error(f'Unknown target for showing: {type}') +        print_error('Valid types are "remote", "image" and "local".') +        sys.exit(1) + + +## Copying procedures +def copy(source_type: str, source_path: str, +         destination_type: str, destination_path: str) -> None: +    """ +    Copy a file or directory locally, remotely or to and from an image. +    Directory uploads and downloads not supported. +    """ +    source = '' +    try: +        # Download to a temporary file and use that as the source. +        if source_type == 'remote': +            source = tempfile.NamedTemporaryFile(delete=False).name +            download(source, source_path) +        # Prepend the image root to the path. +        elif source_type == 'image': +            source = parse_image_path(source_path) +        elif source_type == 'local': +            source = source_path +        else: +            print_error(f'Unknown source type: {source_type}') +            print_error(f'Valid source types are "remote", "image" and "local".') +            sys.exit(1) + +        # Directly upload the file. +        if destination_type == 'remote': +            if os.path.isdir(source): +                print_error(f'Cannot upload {source}. Directory uploads not supported.') +                sys.exit(1) +            upload(source, destination_path) +        # No need to duplicate local copy operations for image copying. +        elif destination_type == 'image': +            copy('local', source, 'local', parse_image_path(destination_path)) +        # Try to preserve metadata when copying. +        elif destination_type == 'local': +            if os.path.isdir(destination_path): +                destination_path = os.path.join(destination_path, os.path.basename(source)) +            if os.path.isdir(source): +                shutil.copytree(source, destination_path, copy_function=zealous_copy) +            else: +                zealous_copy(source, destination_path) +        else: +            print_error(f'Unknown destination type: {source_type}') +            print_error(f'Valid destination types are "remote", "image" and "local".') +            sys.exit(1) +    except OSError: +        import traceback +        # We can't check for every single user error (eg copying a directory to a file) +        #  so we just let a curtailed stack trace provide a descriptive error. +        print_error(f'Failed to copy {source_path} to {destination_path}.') +        traceback.print_exception(*sys.exc_info()[:2], None) +        sys.exit(1) +    else: +        # To prevent a duplicate message. +        if destination_type != 'image': +            print('Copy successful.') +    finally: +        # Clean up temporary file. +        if source_type == 'remote': +            os.remove(source) + + +## Deletion procedures +def delete_locally(path: str) -> None: +    """ +    Remove a local file or directory. +    """ +    try: +        if os.path.isdir(path): +            if (ask_yes_no(f'Do you want to remove {path} with all its contents?')): +                shutil.rmtree(path) +                print(f'Directory {path} removed.') +            else: +                print('Operation aborted.') +        elif os.path.isfile(path): +            if (ask_yes_no(f'Do you want to remove {path}?')): +                os.remove(path) +                print(f'File {path} removed.') +            else: +                print('Operation aborted.') +        else: +            raise OSError(f'File or directory {path} not found.') +    except OSError: +        import traceback +        print_error(f'Failed to delete {path}.') +        traceback.print_exception(*sys.exc_info()[:2], None) +        sys.exit(1) + +def delete(type: str, path: str) -> None: +    if type == 'local': +        delete_locally(path) +    elif type == 'image': +        delete_locally(parse_image_path(path)) +    else: +        print_error(f'Unknown target for deletion: {type}') +        print_error('Valid types are "image" and "local".') +        sys.exit(1) + + +## Cloning procedures +def clone(source: str, destination: str) -> None: +    if os.geteuid(): +        print_error('Only the superuser can run this command.') +        sys.exit(1) +    if destination == 'running' or destination == 'disk-install': +        print_error(f'Cannot clone config to {destination}.') +        sys.exit(1) +    # If `source` is None, then we're going to copy from the running image. +    if source is None or source == 'running': +        source_path = '/config' +        # For the warning message only. +        source = 'the current' +    else: +        source_path = parse_image_path(source + ':/config') +    destination_path = parse_image_path(destination + ':/config') +    backup_path = destination_path + '.preclone' + +    if not os.path.isdir(source_path): +        print_error(f'Source image {source} does not exist.') +        sys.exit(1) +    if not os.path.isdir(destination_path): +        print_error(f'Destination image {destination} does not exist.') +        sys.exit(1) +    print(f'WARNING: This operation will erase /config data in image {destination}.') +    print(f'/config data in {source} image will be copied over in its place.') +    print(f'The existing /config data in {destination} image will be backed up to /config.preclone.') + +    if ask_yes_no('Are you sure you want to continue?'): +        try: +            if os.path.isdir(backup_path): +                print('Removing previous backup...') +                shutil.rmtree(backup_path) +            print('Making new backup...') +            shutil.move(destination_path, backup_path) +        except: +            print('Something went wrong during the backup process!') +            print('Cowardly refusing to proceed with cloning.') +            raise +        # Copy new config from image. +        try: +            shutil.copytree(source_path, destination_path, copy_function=zealous_copy) +        except: +            print('Cloning failed! Reverting to backup!') +            # Delete leftover files from the botched cloning. +            shutil.rmtree(destination_path, ignore_errors=True) +            # Restore backup before bailing out. +            shutil.copytree(backup_path, destination_path, copy_function=zealous_copy) +            raise +        else: +            print(f'Successfully cloned config from {source} to {destination}.') +        finally: +            shutil.rmtree(backup_path) +    else: +        print('Operation aborted.') + +if __name__ == '__main__': +    args = parser.parse_args() +    try: +        if args.show: +            show(*get_types(args.show[0])) +        elif args.copy: +            copy(*get_types(args.copy[0]), +                 *get_types(args.copy[1])) +        elif args.delete: +            delete(*get_types(args.delete[0])) +        elif args.clone_from: +            clone(*args.clone_from) +        elif args.clone: +            # Pass None as source image to copy from local image. +            clone(None, args.clone) +    except KeyboardInterrupt: +        print_error('Operation cancelled by user.') +        sys.exit(1) +    sys.exit(0) | 
