diff options
| -rw-r--r-- | python/vyos/utils/file.py | 171 | 
1 files changed, 171 insertions, 0 deletions
| diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py new file mode 100644 index 000000000..2560a35be --- /dev/null +++ b/python/vyos/utils/file.py @@ -0,0 +1,171 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library.  If not, see <http://www.gnu.org/licenses/>. + +import os + + +def read_file(fname, defaultonfailure=None): +    """ +    read the content of a file, stripping any end characters (space, newlines) +    should defaultonfailure be not None, it is returned on failure to read +    """ +    try: +        """ Read a file to string """ +        with open(fname, 'r') as f: +            data = f.read().strip() +        return data +    except Exception as e: +        if defaultonfailure is not None: +            return defaultonfailure +        raise e + +def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False): +    """ +    Write content of data to given fname, should defaultonfailure be not None, +    it is returned on failure to read. + +    If directory of file is not present, it is auto-created. +    """ +    dirname = os.path.dirname(fname) +    if not os.path.isdir(dirname): +        os.makedirs(dirname, mode=0o755, exist_ok=False) +        chown(dirname, user, group) + +    try: +        """ Write a file to string """ +        bytes = 0 +        with open(fname, 'w' if not append else 'a') as f: +            bytes = f.write(data) +        chown(fname, user, group) +        chmod(fname, mode) +        return bytes +    except Exception as e: +        if defaultonfailure is not None: +            return defaultonfailure +        raise e + +def read_json(fname, defaultonfailure=None): +    """ +    read and json decode the content of a file +    should defaultonfailure be not None, it is returned on failure to read +    """ +    import json +    try: +        with open(fname, 'r') as f: +            data = json.load(f) +        return data +    except Exception as e: +        if defaultonfailure is not None: +            return defaultonfailure +        raise e + +def chown(path, user, group): +    """ change file/directory owner """ +    from pwd import getpwnam +    from grp import getgrnam + +    if user is None or group is None: +        return False + +    # path may also be an open file descriptor +    if not isinstance(path, int) and not os.path.exists(path): +        return False + +    uid = getpwnam(user).pw_uid +    gid = getgrnam(group).gr_gid +    os.chown(path, uid, gid) +    return True + + +def chmod(path, bitmask): +    # path may also be an open file descriptor +    if not isinstance(path, int) and not os.path.exists(path): +        return +    if bitmask is None: +        return +    os.chmod(path, bitmask) + + +def chmod_600(path): +    """ Make file only read/writable by owner """ +    from stat import S_IRUSR, S_IWUSR + +    bitmask = S_IRUSR | S_IWUSR +    chmod(path, bitmask) + + +def chmod_750(path): +    """ Make file/directory only executable to user and group """ +    from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP + +    bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP +    chmod(path, bitmask) + + +def chmod_755(path): +    """ Make file executable by all """ +    from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH + +    bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ +              S_IROTH | S_IXOTH +    chmod(path, bitmask) + + +def makedir(path, user=None, group=None): +    if os.path.exists(path): +        return +    os.makedirs(path, mode=0o755) +    chown(path, user, group) + +def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1): +    """ Waits for an inotify event to occur """ +    if not os.path.dirname(file_path): +        raise ValueError( +          "File path {} does not have a directory part (required for inotify watching)".format(file_path)) +    if not os.path.basename(file_path): +        raise ValueError( +          "File path {} does not have a file part, do not know what to watch for".format(file_path)) + +    from inotify.adapters import Inotify +    from time import time +    from time import sleep + +    time_start = time() + +    i = Inotify() +    i.add_watch(os.path.dirname(file_path)) + +    if pre_hook: +        pre_hook() + +    for event in i.event_gen(yield_nones=True): +        if (timeout is not None) and ((time() - time_start) > timeout): +            # If the function didn't return until this point, +            # the file failed to have been written to and closed within the timeout +            raise OSError("Waiting for file {} to be written has failed".format(file_path)) + +        # Most such events don't take much time, so it's better to check right away +        # and sleep later. +        if event is not None: +            (_, type_names, path, filename) = event +            if filename == os.path.basename(file_path): +                if event_type in type_names: +                    return +        sleep(sleep_interval) + +def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1): +    """ Waits for a process to close a file after opening it in write mode. """ +    wait_for_inotify(file_path, +      event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval) | 
