diff options
author | James Falcon <james.falcon@canonical.com> | 2021-12-15 20:16:38 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-15 19:16:38 -0700 |
commit | bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf (patch) | |
tree | 1fbb3269fc87e39832e3286ef42eefd2b23fcd44 /cloudinit/config/cc_rh_subscription.py | |
parent | 2bcf4fa972fde686c2e3141c58e640640b44dd00 (diff) | |
download | vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.tar.gz vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.zip |
Adopt Black and isort (SC-700) (#1157)
Applied Black and isort, fixed any linting issues, updated tox.ini
and CI.
Diffstat (limited to 'cloudinit/config/cc_rh_subscription.py')
-rw-r--r-- | cloudinit/config/cc_rh_subscription.py | 240 |
1 files changed, 139 insertions, 101 deletions
diff --git a/cloudinit/config/cc_rh_subscription.py b/cloudinit/config/cc_rh_subscription.py index 693317c2..b81a7a9b 100644 --- a/cloudinit/config/cc_rh_subscription.py +++ b/cloudinit/config/cc_rh_subscription.py @@ -39,12 +39,11 @@ Subscription`` example config. """ from cloudinit import log as logging -from cloudinit import subp -from cloudinit import util +from cloudinit import subp, util LOG = logging.getLogger(__name__) -distros = ['fedora', 'rhel'] +distros = ["fedora", "rhel"] def handle(name, cfg, _cloud, log, _args): @@ -60,8 +59,9 @@ def handle(name, cfg, _cloud, log, _args): raise SubscriptionError(verify_msg) cont = sm.rhn_register() if not cont: - raise SubscriptionError("Registration failed or did not " - "run completely") + raise SubscriptionError( + "Registration failed or did not run completely" + ) # Splitting up the registration, auto-attach, and servicelevel # commands because the error codes, messages from subman are not @@ -70,8 +70,7 @@ def handle(name, cfg, _cloud, log, _args): # Attempt to change the service level if sm.auto_attach and sm.servicelevel is not None: if not sm._set_service_level(): - raise SubscriptionError("Setting of service-level " - "failed") + raise SubscriptionError("Setting of service-level failed") else: sm.log.debug("Completed auto-attach with service level") elif sm.auto_attach: @@ -87,8 +86,9 @@ def handle(name, cfg, _cloud, log, _args): return_stat = sm.addPool(sm.pools) if not return_stat: - raise SubscriptionError("Unable to attach pools {0}" - .format(sm.pools)) + raise SubscriptionError( + "Unable to attach pools {0}".format(sm.pools) + ) return_stat = sm.update_repos() if not return_stat: raise SubscriptionError("Unable to add or remove repos") @@ -105,72 +105,87 @@ class SubscriptionError(Exception): class SubscriptionManager(object): - valid_rh_keys = ['org', 'activation-key', 'username', 'password', - 'disable-repo', 'enable-repo', 'add-pool', - 'rhsm-baseurl', 'server-hostname', - 'auto-attach', 'service-level'] + valid_rh_keys = [ + "org", + "activation-key", + "username", + "password", + "disable-repo", + "enable-repo", + "add-pool", + "rhsm-baseurl", + "server-hostname", + "auto-attach", + "service-level", + ] def __init__(self, cfg, log=None): if log is None: log = LOG self.log = log self.cfg = cfg - self.rhel_cfg = self.cfg.get('rh_subscription', {}) - self.rhsm_baseurl = self.rhel_cfg.get('rhsm-baseurl') - self.server_hostname = self.rhel_cfg.get('server-hostname') - self.pools = self.rhel_cfg.get('add-pool') - self.activation_key = self.rhel_cfg.get('activation-key') - self.org = self.rhel_cfg.get('org') - self.userid = self.rhel_cfg.get('username') - self.password = self.rhel_cfg.get('password') - self.auto_attach = self.rhel_cfg.get('auto-attach') - self.enable_repo = self.rhel_cfg.get('enable-repo') - self.disable_repo = self.rhel_cfg.get('disable-repo') - self.servicelevel = self.rhel_cfg.get('service-level') + self.rhel_cfg = self.cfg.get("rh_subscription", {}) + self.rhsm_baseurl = self.rhel_cfg.get("rhsm-baseurl") + self.server_hostname = self.rhel_cfg.get("server-hostname") + self.pools = self.rhel_cfg.get("add-pool") + self.activation_key = self.rhel_cfg.get("activation-key") + self.org = self.rhel_cfg.get("org") + self.userid = self.rhel_cfg.get("username") + self.password = self.rhel_cfg.get("password") + self.auto_attach = self.rhel_cfg.get("auto-attach") + self.enable_repo = self.rhel_cfg.get("enable-repo") + self.disable_repo = self.rhel_cfg.get("disable-repo") + self.servicelevel = self.rhel_cfg.get("service-level") def log_success(self, msg): - '''Simple wrapper for logging info messages. Useful for unittests''' + """Simple wrapper for logging info messages. Useful for unittests""" self.log.info(msg) def log_warn(self, msg): - '''Simple wrapper for logging warning messages. Useful for unittests''' + """Simple wrapper for logging warning messages. Useful for unittests""" self.log.warning(msg) def _verify_keys(self): - ''' + """ Checks that the keys in the rh_subscription dict from the user-data are what we expect. - ''' + """ for k in self.rhel_cfg: if k not in self.valid_rh_keys: - bad_key = "{0} is not a valid key for rh_subscription. "\ - "Valid keys are: "\ - "{1}".format(k, ', '.join(self.valid_rh_keys)) + bad_key = ( + "{0} is not a valid key for rh_subscription. " + "Valid keys are: " + "{1}".format(k, ", ".join(self.valid_rh_keys)) + ) return False, bad_key # Check for bad auto-attach value - if (self.auto_attach is not None) and \ - not (util.is_true(self.auto_attach) or - util.is_false(self.auto_attach)): - not_bool = "The key auto-attach must be a boolean value "\ - "(True/False " + if (self.auto_attach is not None) and not ( + util.is_true(self.auto_attach) or util.is_false(self.auto_attach) + ): + not_bool = ( + "The key auto-attach must be a boolean value (True/False " + ) return False, not_bool - if (self.servicelevel is not None) and ((not self.auto_attach) or - (util.is_false(str(self.auto_attach)))): - no_auto = ("The service-level key must be used in conjunction " - "with the auto-attach key. Please re-run with " - "auto-attach: True") + if (self.servicelevel is not None) and ( + (not self.auto_attach) or (util.is_false(str(self.auto_attach))) + ): + no_auto = ( + "The service-level key must be used in conjunction " + "with the auto-attach key. Please re-run with " + "auto-attach: True" + ) return False, no_auto return True, None def is_registered(self): - ''' + """ Checks if the system is already registered and returns True if so, else False - ''' - cmd = ['identity'] + """ + cmd = ["identity"] try: _sub_man_cli(cmd) @@ -180,15 +195,18 @@ class SubscriptionManager(object): return True def rhn_register(self): - ''' + """ Registers the system by userid and password or activation key and org. Returns True when successful False when not. - ''' + """ if (self.activation_key is not None) and (self.org is not None): # register by activation key - cmd = ['register', '--activationkey={0}'. - format(self.activation_key), '--org={0}'.format(self.org)] + cmd = [ + "register", + "--activationkey={0}".format(self.activation_key), + "--org={0}".format(self.org), + ] # If the baseurl and/or server url are passed in, we register # with them. @@ -203,14 +221,18 @@ class SubscriptionManager(object): return_out = _sub_man_cli(cmd, logstring_val=True)[0] except subp.ProcessExecutionError as e: if e.stdout == "": - self.log_warn("Registration failed due " - "to: {0}".format(e.stderr)) + self.log_warn( + "Registration failed due to: {0}".format(e.stderr) + ) return False elif (self.userid is not None) and (self.password is not None): # register by username and password - cmd = ['register', '--username={0}'.format(self.userid), - '--password={0}'.format(self.password)] + cmd = [ + "register", + "--username={0}".format(self.userid), + "--password={0}".format(self.password), + ] # If the baseurl and/or server url are passed in, we register # with them. @@ -226,15 +248,18 @@ class SubscriptionManager(object): return_out = _sub_man_cli(cmd, logstring_val=True)[0] except subp.ProcessExecutionError as e: if e.stdout == "": - self.log_warn("Registration failed due " - "to: {0}".format(e.stderr)) + self.log_warn( + "Registration failed due to: {0}".format(e.stderr) + ) return False else: - self.log_warn("Unable to register system due to incomplete " - "information.") - self.log_warn("Use either activationkey and org *or* userid " - "and password") + self.log_warn( + "Unable to register system due to incomplete information." + ) + self.log_warn( + "Use either activationkey and org *or* userid and password" + ) return False reg_id = return_out.split("ID: ")[1].rstrip() @@ -242,19 +267,25 @@ class SubscriptionManager(object): return True def _set_service_level(self): - cmd = ['attach', '--auto', '--servicelevel={0}' - .format(self.servicelevel)] + cmd = [ + "attach", + "--auto", + "--servicelevel={0}".format(self.servicelevel), + ] try: return_out = _sub_man_cli(cmd)[0] except subp.ProcessExecutionError as e: - if e.stdout.rstrip() != '': + if e.stdout.rstrip() != "": for line in e.stdout.split("\n"): - if line != '': + if line != "": self.log_warn(line) else: - self.log_warn("Setting the service level failed with: " - "{0}".format(e.stderr.strip())) + self.log_warn( + "Setting the service level failed with: {0}".format( + e.stderr.strip() + ) + ) return False for line in return_out.split("\n"): if line != "": @@ -262,7 +293,7 @@ class SubscriptionManager(object): return True def _set_auto_attach(self): - cmd = ['attach', '--auto'] + cmd = ["attach", "--auto"] try: return_out = _sub_man_cli(cmd)[0] except subp.ProcessExecutionError as e: @@ -274,52 +305,52 @@ class SubscriptionManager(object): return True def _getPools(self): - ''' + """ Gets the list pools for the active subscription and returns them in list form. - ''' + """ available = [] consumed = [] # Get all available pools - cmd = ['list', '--available', '--pool-only'] + cmd = ["list", "--available", "--pool-only"] results = _sub_man_cli(cmd)[0] available = (results.rstrip()).split("\n") # Get all consumed pools - cmd = ['list', '--consumed', '--pool-only'] + cmd = ["list", "--consumed", "--pool-only"] results = _sub_man_cli(cmd)[0] consumed = (results.rstrip()).split("\n") return available, consumed def _getRepos(self): - ''' + """ Obtains the current list of active yum repositories and returns them in list form. - ''' + """ - cmd = ['repos', '--list-enabled'] + cmd = ["repos", "--list-enabled"] return_out = _sub_man_cli(cmd)[0] active_repos = [] for repo in return_out.split("\n"): if "Repo ID:" in repo: - active_repos.append((repo.split(':')[1]).strip()) + active_repos.append((repo.split(":")[1]).strip()) - cmd = ['repos', '--list-disabled'] + cmd = ["repos", "--list-disabled"] return_out = _sub_man_cli(cmd)[0] inactive_repos = [] for repo in return_out.split("\n"): if "Repo ID:" in repo: - inactive_repos.append((repo.split(':')[1]).strip()) + inactive_repos.append((repo.split(":")[1]).strip()) return active_repos, inactive_repos def addPool(self, pools): - ''' + """ Takes a list of subscription pools and "attaches" them to the current subscription - ''' + """ # An empty list was passed if len(pools) == 0: @@ -328,31 +359,33 @@ class SubscriptionManager(object): pool_available, pool_consumed = self._getPools() pool_list = [] - cmd = ['attach'] + cmd = ["attach"] for pool in pools: if (pool not in pool_consumed) and (pool in pool_available): - pool_list.append('--pool={0}'.format(pool)) + pool_list.append("--pool={0}".format(pool)) else: self.log_warn("Pool {0} is not available".format(pool)) if len(pool_list) > 0: cmd.extend(pool_list) try: _sub_man_cli(cmd) - self.log.debug("Attached the following pools to your " - "system: %s", (", ".join(pool_list)) - .replace('--pool=', '')) + self.log.debug( + "Attached the following pools to your system: %s", + (", ".join(pool_list)).replace("--pool=", ""), + ) return True except subp.ProcessExecutionError as e: - self.log_warn("Unable to attach pool {0} " - "due to {1}".format(pool, e)) + self.log_warn( + "Unable to attach pool {0} due to {1}".format(pool, e) + ) return False def update_repos(self): - ''' + """ Takes a list of yum repo ids that need to be disabled or enabled; then it verifies if they are already enabled or disabled and finally executes the action to disable or enable - ''' + """ erepos = self.enable_repo drepos = self.disable_repo @@ -378,7 +411,7 @@ class SubscriptionManager(object): enable_list = [] enable_list_fail = [] for repoid in erepos: - if (repoid in inactive_repos): + if repoid in inactive_repos: enable_list.append("--enable={0}".format(repoid)) else: enable_list_fail.append(repoid) @@ -399,14 +432,16 @@ class SubscriptionManager(object): if fail in active_repos: self.log.debug("Repo %s is already enabled", fail) else: - self.log_warn("Repo {0} does not appear to " - "exist".format(fail)) + self.log_warn( + "Repo {0} does not appear to exist".format(fail) + ) if len(disable_list_fail) > 0: for fail in disable_list_fail: - self.log.debug("Repo %s not disabled " - "because it is not enabled", fail) + self.log.debug( + "Repo %s not disabled because it is not enabled", fail + ) - cmd = ['repos'] + cmd = ["repos"] if len(disable_list) > 0: cmd.extend(disable_list) @@ -420,11 +455,15 @@ class SubscriptionManager(object): return False if len(enable_list) > 0: - self.log.debug("Enabled the following repos: %s", - (", ".join(enable_list)).replace('--enable=', '')) + self.log.debug( + "Enabled the following repos: %s", + (", ".join(enable_list)).replace("--enable=", ""), + ) if len(disable_list) > 0: - self.log.debug("Disabled the following repos: %s", - (", ".join(disable_list)).replace('--disable=', '')) + self.log.debug( + "Disabled the following repos: %s", + (", ".join(disable_list)).replace("--disable=", ""), + ) return True def is_configured(self): @@ -432,13 +471,12 @@ class SubscriptionManager(object): def _sub_man_cli(cmd, logstring_val=False): - ''' + """ Uses the prefered cloud-init subprocess def of subp.subp and runs subscription-manager. Breaking this to a separate function for later use in mocking and unittests - ''' - return subp.subp(['subscription-manager'] + cmd, - logstring=logstring_val) + """ + return subp.subp(["subscription-manager"] + cmd, logstring=logstring_val) # vi: ts=4 expandtab |