diff options
| -rw-r--r-- | python/vyos/progressbar.py | 33 | ||||
| -rw-r--r-- | python/vyos/remote.py | 43 | ||||
| -rw-r--r-- | python/vyos/utils/io.py | 10 | 
3 files changed, 45 insertions, 41 deletions
| diff --git a/python/vyos/progressbar.py b/python/vyos/progressbar.py index 1793c445b..7bc9d9856 100644 --- a/python/vyos/progressbar.py +++ b/python/vyos/progressbar.py @@ -19,26 +19,35 @@ import signal  import subprocess  import sys +from vyos.utils.io import is_dumb_terminal  from vyos.utils.io import print_error +  class Progressbar:      def __init__(self, step=None):          self.total = 0.0          self.step = step +        # Silently ignore all calls if terminal capabilities are lacking. +        # This will also prevent the output from littering Ansible logs, +        # as `ansible.netcommon.network_cli' coaxes the terminal into believing +        # it is interactive. +        self._dumb = is_dumb_terminal()      def __enter__(self): -        # Recalculate terminal width with every window resize. -        signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols()) -        # Disable line wrapping to prevent the staircase effect. -        subprocess.run(['tput', 'rmam'], check=False) -        self._update_cols() -        # Print an empty progressbar with entry. -        self.progress(0, 1) +        if not self._dumb: +            # Recalculate terminal width with every window resize. +            signal.signal(signal.SIGWINCH, lambda signum, frame: self._update_cols()) +            # Disable line wrapping to prevent the staircase effect. +            subprocess.run(['tput', 'rmam'], check=False) +            self._update_cols() +            # Print an empty progressbar with entry. +            self.progress(0, 1)          return self      def __exit__(self, exc_type, kexc_val, exc_tb): -        # Revert to the default SIGWINCH handler (ie nothing). -        signal.signal(signal.SIGWINCH, signal.SIG_DFL) -        # Reenable line wrapping. -        subprocess.run(['tput', 'smam'], check=False) +        if not self._dumb: +            # Revert to the default SIGWINCH handler (ie nothing). +            signal.signal(signal.SIGWINCH, signal.SIG_DFL) +            # Reenable line wrapping. +            subprocess.run(['tput', 'smam'], check=False)      def _update_cols(self):          # `os.get_terminal_size()' is fast enough for our purposes.          self.col = max(os.get_terminal_size().columns - 15, 20) @@ -60,7 +69,7 @@ class Progressbar:          Stateless progressbar taking no input at init and current progress with          final size at callback (for SSH)          """ -        if done <= total: +        if done <= total and not self._dumb:              length = math.ceil(self.col * done / total)              percentage = str(math.ceil(100 * done / total)).rjust(3)              # Carriage return at the end will make sure the line will get overwritten. diff --git a/python/vyos/remote.py b/python/vyos/remote.py index 1ca8a9530..4be477d24 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -34,6 +34,7 @@ from requests.packages.urllib3 import PoolManager  from vyos.progressbar import Progressbar  from vyos.utils.io import ask_yes_no +from vyos.utils.io import is_interactive  from vyos.utils.io import print_error  from vyos.utils.misc import begin  from vyos.utils.process import cmd @@ -49,7 +50,7 @@ class InteractivePolicy(MissingHostKeyPolicy):      def missing_host_key(self, client, hostname, key):          print_error(f"Host '{hostname}' not found in known hosts.")          print_error('Fingerprint: ' + key.get_fingerprint().hex()) -        if sys.stdout.isatty() and ask_yes_no('Do you wish to continue?'): +        if is_interactive() and ask_yes_no('Do you wish to continue?'):              if client._host_keys_filename\                 and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'):                  client._host_keys.add(hostname, key.get_name(), key) @@ -250,7 +251,6 @@ class HttpC:                          allow_redirects=True,                          timeout=self.timeout) as r:                  # Abort early if the destination is inaccessible. -                print('pre-3')                  r.raise_for_status()                  # If the request got redirected, keep the last URL we ended up with.                  final_urlstring = r.url @@ -323,17 +323,25 @@ def urlc(urlstring, *args, **kwargs):      except KeyError:          raise ValueError(f'Unsupported URL scheme: "{url.scheme}"') -def download(local_path, urlstring, *args, **kwargs): +def download(local_path, urlstring, progressbar=False, check_space=False, +             source_host='', source_port=0, timeout=10.0):      try: -        urlc(urlstring, *args, **kwargs).download(local_path) +        progressbar = progressbar and is_interactive() +        urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path)      except Exception as err:          print_error(f'Unable to download "{urlstring}": {err}') +    except KeyboardInterrupt: +        print_error('\nDownload aborted by user.') -def upload(local_path, urlstring, *args, **kwargs): +def upload(local_path, urlstring, progressbar=False, +           source_host='', source_port=0, timeout=10.0):      try: -        urlc(urlstring, *args, **kwargs).upload(local_path) +        progressbar = progressbar and is_interactive() +        urlc(urlstring, progressbar, source_host, source_port, timeout).upload(local_path)      except Exception as err:          print_error(f'Unable to upload "{urlstring}": {err}') +    except KeyboardInterrupt: +        print_error('\nUpload aborted by user.')  def get_remote_config(urlstring, source_host='', source_port=0):      """ @@ -346,26 +354,3 @@ def get_remote_config(urlstring, source_host='', source_port=0):              return f.read()      finally:          os.remove(temp) - -def friendly_download(local_path, urlstring, source_host='', source_port=0): -    """ -    Download with a progress bar, reassuring messages and free space checks. -    """ -    try: -        print_error('Downloading...') -        download(local_path, urlstring, True, True, source_host, source_port) -    except KeyboardInterrupt: -        print_error('\nDownload aborted by user.') -        sys.exit(1) -    except: -        import traceback -        print_error(f'Failed to download {urlstring}.') -        # There are a myriad different reasons a download could fail. -        # SSH errors, FTP errors, I/O errors, HTTP errors (403, 404...) -        # We omit the scary stack trace but print the error nevertheless. -        exc_type, exc_value, exc_traceback = sys.exc_info() -        traceback.print_exception(exc_type, exc_value, None, 0, None, False) -        sys.exit(1) -    else: -        print_error('Download complete.') -        sys.exit(0) diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 5fffa62f8..8790cbaac 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -62,3 +62,13 @@ def ask_yes_no(question, default=False) -> bool:                  stdout.write("Please respond with yes/y or no/n\n")          except EOFError:              stdout.write("\nPlease respond with yes/y or no/n\n") + +def is_interactive(): +    """Try to determine if the routine was called from an interactive shell.""" +    import os, sys +    return os.getenv('TERM', default=False) and sys.stderr.isatty() and sys.stdout.isatty() + +def is_dumb_terminal(): +    """Check if the current TTY is dumb, so that we can disable advanced terminal features.""" +    import os +    return os.getenv('TERM') in ['vt100', 'dumb'] | 
