diff options
Diffstat (limited to 'azurelinuxagent')
35 files changed, 2214 insertions, 608 deletions
diff --git a/azurelinuxagent/agent.py b/azurelinuxagent/agent.py index 1309d94..2c21e47 100644 --- a/azurelinuxagent/agent.py +++ b/azurelinuxagent/agent.py @@ -31,7 +31,7 @@ import azurelinuxagent.common.conf as conf from azurelinuxagent.common.version import AGENT_NAME, AGENT_LONG_VERSION, \ DISTRO_NAME, DISTRO_VERSION, \ PY_VERSION_MAJOR, PY_VERSION_MINOR, \ - PY_VERSION_MICRO + PY_VERSION_MICRO, GOAL_STATE_AGENT_VERSION from azurelinuxagent.common.osutil import get_osutil class Agent(object): @@ -91,6 +91,8 @@ class Agent(object): """ print("Register {0} service".format(AGENT_NAME)) self.osutil.register_agent_service() + print("Stop {0} service".format(AGENT_NAME)) + self.osutil.stop_agent_service() print("Start {0} service".format(AGENT_NAME)) self.osutil.start_agent_service() @@ -171,10 +173,14 @@ def version(): """ Show agent version """ - print(("{0} running on {1} {2}".format(AGENT_LONG_VERSION, DISTRO_NAME, - DISTRO_VERSION))) - print("Python: {0}.{1}.{2}".format(PY_VERSION_MAJOR, PY_VERSION_MINOR, + print(("{0} running on {1} {2}".format(AGENT_LONG_VERSION, + DISTRO_NAME, + DISTRO_VERSION))) + print("Python: {0}.{1}.{2}".format(PY_VERSION_MAJOR, + PY_VERSION_MINOR, PY_VERSION_MICRO)) + print("Goal state agent: {0}".format(GOAL_STATE_AGENT_VERSION)) + def usage(): """ Show agent usage diff --git a/azurelinuxagent/common/conf.py b/azurelinuxagent/common/conf.py index 1a3b0da..9c79d10 100644 --- a/azurelinuxagent/common/conf.py +++ b/azurelinuxagent/common/conf.py @@ -24,10 +24,12 @@ import os import azurelinuxagent.common.utils.fileutil as fileutil from azurelinuxagent.common.exception import AgentConfigError + class ConfigurationProvider(object): """ Parse amd store key:values in /etc/waagent.conf. """ + def __init__(self): self.values = dict() @@ -66,6 +68,7 @@ class ConfigurationProvider(object): __conf__ = ConfigurationProvider() + def load_conf_from_file(conf_file_path, conf=__conf__): """ Load conf file from: conf_file_path @@ -80,102 +83,143 @@ def load_conf_from_file(conf_file_path, conf=__conf__): raise AgentConfigError(("Failed to load conf file:{0}, {1}" "").format(conf_file_path, err)) + def enable_rdma(conf=__conf__): - return conf.get_switch("OS.EnableRDMA", False) + return conf.get_switch("OS.EnableRDMA", False) or \ + conf.get_switch("OS.UpdateRdmaDriver", False) or \ + conf.get_switch("OS.CheckRdmaDriver", False) + def get_logs_verbose(conf=__conf__): return conf.get_switch("Logs.Verbose", False) + def get_lib_dir(conf=__conf__): return conf.get("Lib.Dir", "/var/lib/waagent") + def get_dvd_mount_point(conf=__conf__): return conf.get("DVD.MountPoint", "/mnt/cdrom/secure") + def get_agent_pid_file_path(conf=__conf__): return conf.get("Pid.File", "/var/run/waagent.pid") + def get_ext_log_dir(conf=__conf__): return conf.get("Extension.LogDir", "/var/log/azure") + def get_openssl_cmd(conf=__conf__): return conf.get("OS.OpensslPath", "/usr/bin/openssl") + def get_home_dir(conf=__conf__): return conf.get("OS.HomeDir", "/home") + def get_passwd_file_path(conf=__conf__): return conf.get("OS.PasswordPath", "/etc/shadow") + def get_sudoers_dir(conf=__conf__): return conf.get("OS.SudoersDir", "/etc/sudoers.d") + def get_sshd_conf_file_path(conf=__conf__): return conf.get("OS.SshdConfigPath", "/etc/ssh/sshd_config") + def get_root_device_scsi_timeout(conf=__conf__): return conf.get("OS.RootDeviceScsiTimeout", None) + def get_ssh_host_keypair_type(conf=__conf__): return conf.get("Provisioning.SshHostKeyPairType", "rsa") + def get_provision_enabled(conf=__conf__): return conf.get_switch("Provisioning.Enabled", True) + def get_allow_reset_sys_user(conf=__conf__): return conf.get_switch("Provisioning.AllowResetSysUser", False) + def get_regenerate_ssh_host_key(conf=__conf__): return conf.get_switch("Provisioning.RegenerateSshHostKeyPair", False) + def get_delete_root_password(conf=__conf__): return conf.get_switch("Provisioning.DeleteRootPassword", False) + def get_decode_customdata(conf=__conf__): return conf.get_switch("Provisioning.DecodeCustomData", False) + def get_execute_customdata(conf=__conf__): return conf.get_switch("Provisioning.ExecuteCustomData", False) + def get_password_cryptid(conf=__conf__): return conf.get("Provisioning.PasswordCryptId", "6") + def get_password_crypt_salt_len(conf=__conf__): return conf.get_int("Provisioning.PasswordCryptSaltLength", 10) + def get_monitor_hostname(conf=__conf__): return conf.get_switch("Provisioning.MonitorHostName", False) + def get_httpproxy_host(conf=__conf__): return conf.get("HttpProxy.Host", None) + def get_httpproxy_port(conf=__conf__): - return conf.get("HttpProxy.Port", None) + return conf.get_int("HttpProxy.Port", None) + def get_detect_scvmm_env(conf=__conf__): return conf.get_switch("DetectScvmmEnv", False) + def get_resourcedisk_format(conf=__conf__): return conf.get_switch("ResourceDisk.Format", False) + def get_resourcedisk_enable_swap(conf=__conf__): return conf.get_switch("ResourceDisk.EnableSwap", False) + def get_resourcedisk_mountpoint(conf=__conf__): return conf.get("ResourceDisk.MountPoint", "/mnt/resource") + +def get_resourcedisk_mountoptions(conf=__conf__): + return conf.get("ResourceDisk.MountOptions", None) + + def get_resourcedisk_filesystem(conf=__conf__): return conf.get("ResourceDisk.Filesystem", "ext3") + def get_resourcedisk_swap_size_mb(conf=__conf__): return conf.get_int("ResourceDisk.SwapSizeMB", 0) + def get_autoupdate_gafamily(conf=__conf__): return conf.get("AutoUpdate.GAFamily", "Prod") + def get_autoupdate_enabled(conf=__conf__): return conf.get_switch("AutoUpdate.Enabled", True) + def get_autoupdate_frequency(conf=__conf__): return conf.get_int("Autoupdate.Frequency", 3600) +def get_enable_overprovisioning(conf=__conf__): + return conf.get_switch("EnableOverProvisioning", False)
\ No newline at end of file diff --git a/azurelinuxagent/common/event.py b/azurelinuxagent/common/event.py index 374b0e7..4037622 100644 --- a/azurelinuxagent/common/event.py +++ b/azurelinuxagent/common/event.py @@ -32,21 +32,24 @@ from azurelinuxagent.common.protocol.restapi import TelemetryEventParam, \ TelemetryEvent, \ set_properties, get_properties from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \ - DISTRO_CODE_NAME, AGENT_VERSION + DISTRO_CODE_NAME, AGENT_VERSION, \ + CURRENT_AGENT, CURRENT_VERSION class WALAEventOperation: + ActivateResourceDisk="ActivateResourceDisk" + Disable = "Disable" + Download = "Download" + Enable = "Enable" + HealthCheck = "HealthCheck" HeartBeat="HeartBeat" - Provision = "Provision" Install = "Install" + Provision = "Provision" + Restart="Restart" + UnhandledError="UnhandledError" UnInstall = "UnInstall" - Disable = "Disable" - Enable = "Enable" - Download = "Download" Upgrade = "Upgrade" Update = "Update" - ActivateResourceDisk="ActivateResourceDisk" - UnhandledError="UnhandledError" class EventLogger(object): def __init__(self): @@ -71,7 +74,7 @@ class EventLogger(object): except IOError as e: raise EventError("Failed to write events to file:{0}", e) - def add_event(self, name, op="", is_success=True, duration=0, version=AGENT_VERSION, + def add_event(self, name, op="", is_success=True, duration=0, version=CURRENT_VERSION, message="", evt_type="", is_internal=False): event = TelemetryEvent(1, "69B669B9-4AF8-4C50-BDC4-6006FA76E975") event.parameters.append(TelemetryEventParam('Name', name)) @@ -92,7 +95,7 @@ class EventLogger(object): __event_logger__ = EventLogger() -def add_event(name, op="", is_success=True, duration=0, version=AGENT_VERSION, +def add_event(name, op="", is_success=True, duration=0, version=CURRENT_VERSION, message="", evt_type="", is_internal=False, reporter=__event_logger__): log = logger.info if is_success else logger.error diff --git a/azurelinuxagent/common/osutil/alpine.py b/azurelinuxagent/common/osutil/alpine.py new file mode 100644 index 0000000..202d01d --- /dev/null +++ b/azurelinuxagent/common/osutil/alpine.py @@ -0,0 +1,51 @@ +# Microsoft Azure Linux Agent +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import azurelinuxagent.common.logger as logger +import azurelinuxagent.common.utils.shellutil as shellutil +from azurelinuxagent.common.osutil.default import DefaultOSUtil + +class AlpineOSUtil(DefaultOSUtil): + def __init__(self): + super(AlpineOSUtil, self).__init__() + self.agent_conf_file_path = '/etc/waagent.conf' + + def is_dhcp_enabled(self): + return True + + def get_dhcp_pid(self): + ret = shellutil.run_get_output('pidof dhcpcd', chk_err=False) + if ret[0] == 0: + logger.info('dhcpcd is pid {}'.format(ret[1])) + return ret[1].strip() + return None + + def restart_if(self, ifname): + logger.info('restarting {} (sort of, actually SIGHUPing dhcpcd)'.format(ifname)) + pid = self.get_dhcp_pid() + if pid != None: + ret = shellutil.run_get_output('kill -HUP {}'.format(pid)) + + def set_ssh_client_alive_interval(self): + # Alpine will handle this. + pass + + def conf_sshd(self, disable_password): + # Alpine will handle this. + pass diff --git a/azurelinuxagent/common/osutil/bigip.py b/azurelinuxagent/common/osutil/bigip.py new file mode 100644 index 0000000..fea7aff --- /dev/null +++ b/azurelinuxagent/common/osutil/bigip.py @@ -0,0 +1,383 @@ +# Copyright 2016 F5 Networks Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import array +import fcntl +import os +import platform +import re +import socket +import struct +import time + +try: + # WAAgent > 2.1.3 + import azurelinuxagent.common.logger as logger + import azurelinuxagent.common.utils.shellutil as shellutil + + from azurelinuxagent.common.exception import OSUtilError + from azurelinuxagent.common.osutil.default import DefaultOSUtil +except ImportError: + # WAAgent <= 2.1.3 + import azurelinuxagent.logger as logger + import azurelinuxagent.utils.shellutil as shellutil + + from azurelinuxagent.exception import OSUtilError + from azurelinuxagent.distro.default.osutil import DefaultOSUtil + + +class BigIpOSUtil(DefaultOSUtil): + def __init__(self): + super(BigIpOSUtil, self).__init__() + + def _wait_until_mcpd_is_initialized(self): + """Wait for mcpd to become available + + All configuration happens in mcpd so we need to wait that this is + available before we go provisioning the system. I call this method + at the first opportunity I have (during the DVD mounting call). + This ensures that the rest of the provisioning does not need to wait + for mcpd to be available unless it absolutely wants to. + + :return bool: Returns True upon success + :raises OSUtilError: Raises exception if mcpd does not come up within + roughly 50 minutes (100 * 30 seconds) + """ + for retries in range(1, 100): + # Retry until mcpd completes startup: + logger.info("Checking to see if mcpd is up") + rc = shellutil.run("/usr/bin/tmsh -a show sys mcp-state field-fmt 2>/dev/null | grep phase | grep running", chk_err=False) + if rc == 0: + logger.info("mcpd is up!") + break + time.sleep(30) + + if rc is 0: + return True + + raise OSUtilError( + "mcpd hasn't completed initialization! Cannot proceed!" + ) + + def _save_sys_config(self): + cmd = "/usr/bin/tmsh save sys config" + rc = shellutil.run(cmd) + if rc != 0: + logger.error("WARNING: Cannot save sys config on 1st boot.") + return rc + + def restart_ssh_service(self): + return shellutil.run("/usr/bin/bigstart restart sshd", chk_err=False) + + def stop_agent_service(self): + return shellutil.run("/sbin/service waagent stop", chk_err=False) + + def start_agent_service(self): + return shellutil.run("/sbin/service waagent start", chk_err=False) + + def register_agent_service(self): + return shellutil.run("/sbin/chkconfig --add waagent", chk_err=False) + + def unregister_agent_service(self): + return shellutil.run("/sbin/chkconfig --del waagent", chk_err=False) + + def get_dhcp_pid(self): + ret = shellutil.run_get_output("/sbin/pidof dhclient") + return ret[1] if ret[0] == 0 else None + + def set_hostname(self, hostname): + """Set the static hostname of the device + + Normally, tmsh is used to set the hostname for the system. For our + purposes at this time though, I would hesitate to trust this function. + + Azure(Stack) uses the name that you provide in the Web UI or ARM (for + example) as the value of the hostname argument to this method. The + problem is that there is nowhere in the UI that specifies the + restrictions and checks that tmsh has for the hostname. + + For example, if you set the name "bigip1" in the Web UI, Azure(Stack) + considers that a perfectly valid name. When WAAgent gets around to + running though, tmsh will reject that value because it is not a fully + qualified domain name. The proper value should have been bigip.xxx.yyy + + WAAgent will not fail if this command fails, but the hostname will not + be what the user set either. Currently we do not set the hostname when + WAAgent starts up, so I am passing on setting it here too. + + :param hostname: The hostname to set on the device + """ + return None + + def set_dhcp_hostname(self, hostname): + """Sets the DHCP hostname + + See `set_hostname` for an explanation of why I pass here + + :param hostname: The hostname to set on the device + """ + return None + + def useradd(self, username, expiration=None): + """Create user account using tmsh + + Our policy is to create two accounts when booting a BIG-IP instance. + The first account is the one that the user specified when they did + the instance creation. The second one is the admin account that is, + or should be, built in to the system. + + :param username: The username that you want to add to the system + :param expiration: The expiration date to use. We do not use this + value. + """ + if self.get_userentry(username): + logger.info("User {0} already exists, skip useradd", username) + return None + + cmd = "/usr/bin/tmsh create auth user %s partition-access add { all-partitions { role admin } } shell bash" % (username) + retcode, out = shellutil.run_get_output(cmd, log_cmd=True, chk_err=True) + if retcode != 0: + raise OSUtilError( + "Failed to create user account:{0}, retcode:{1}, output:{2}".format(username, retcode, out) + ) + self._save_sys_config() + return retcode + + def chpasswd(self, username, password, crypt_id=6, salt_len=10): + """Change a user's password with tmsh + + Since we are creating the user specified account and additionally + changing the password of the built-in 'admin' account, both must + be modified in this method. + + Note that the default method also checks for a "system level" of the + user; based on the value of UID_MIN in /etc/login.defs. In our env, + all user accounts have the UID 0. So we can't rely on this value. + + :param username: The username whose password to change + :param password: The unencrypted password to set for the user + :param crypt_id: If encrypting the password, the crypt_id that was used + :param salt_len: If encrypting the password, the length of the salt + value used to do it. + """ + + # Start by setting the password of the user provided account + cmd = "/usr/bin/tmsh modify auth user {0} password '{1}'".format(username, password) + ret, output = shellutil.run_get_output(cmd, log_cmd=False, chk_err=True) + if ret != 0: + raise OSUtilError( + "Failed to set password for {0}: {1}".format(username, output) + ) + + # Next, set the password of the built-in 'admin' account to be have + # the same password as the user provided account + userentry = self.get_userentry('admin') + if userentry is None: + raise OSUtilError("The 'admin' user account was not found!") + + cmd = "/usr/bin/tmsh modify auth user 'admin' password '{0}'".format(password) + ret, output = shellutil.run_get_output(cmd, log_cmd=False, chk_err=True) + if ret != 0: + raise OSUtilError( + "Failed to set password for 'admin': {0}".format(output) + ) + self._save_sys_config() + return ret + + def del_account(self, username): + """Deletes a user account. + + Note that the default method also checks for a "system level" of the + user; based on the value of UID_MIN in /etc/login.defs. In our env, + all user accounts have the UID 0. So we can't rely on this value. + + We also don't use sudo, so we remove that method call as well. + + :param username: + :return: + """ + shellutil.run("> /var/run/utmp") + shellutil.run("/usr/bin/tmsh delete auth user " + username) + + def get_dvd_device(self, dev_dir='/dev'): + """Find BIG-IP's CD/DVD device + + This device is almost certainly /dev/cdrom so I added the ? to this pattern. + Note that this method will return upon the first device found, but in my + tests with 12.1.1 it will also find /dev/sr0 on occasion. This is NOT the + correct CD/DVD device though. + + :todo: Consider just always returning "/dev/cdrom" here if that device device + exists on all platforms that are supported on Azure(Stack) + :param dev_dir: The root directory from which to look for devices + """ + patten = r'(sr[0-9]|hd[c-z]|cdrom[0-9]?)' + for dvd in [re.match(patten, dev) for dev in os.listdir(dev_dir)]: + if dvd is not None: + return "/dev/{0}".format(dvd.group(0)) + raise OSUtilError("Failed to get dvd device") + + def mount_dvd(self, **kwargs): + """Mount the DVD containing the provisioningiso.iso file + + This is the _first_ hook that WAAgent provides for us, so this is the + point where we should wait for mcpd to load. I am just overloading + this method to add the mcpd wait. Then I proceed with the stock code. + + :param max_retry: Maximum number of retries waagent will make when + mounting the provisioningiso.iso DVD + :param chk_err: Whether to check for errors or not in the mounting + commands + """ + self._wait_until_mcpd_is_initialized() + return super(BigIpOSUtil, self).mount_dvd(**kwargs) + + def eject_dvd(self, chk_err=True): + """Runs the eject command to eject the provisioning DVD + + BIG-IP does not include an eject command. It is sufficient to just + umount the DVD disk. But I will log that we do not support this for + future reference. + + :param chk_err: Whether or not to check for errors raised by the eject + command + """ + logger.warn("Eject is not supported on this platform") + + def set_admin_access_to_ip(self, dest_ip): + """Sets admin access to an IP address + + This method is primarily used to limit which user account is allowed to + communicate with the Azure(Stack) metadata service. This service is at + the address 169.254.169.254 and includes information about the device + that "normal" users should not be allowed to see. + + We cannot use this iptables command that comes with the default class + because we do not ship the 'ipt_owner' iptables extension with BIG-IP. + + This should not be a problem though as the only people who should have + access to BIG-IP are people who are root anyways. Our system is not + a "general purpose" user system. So for those reasons I am dropping + that requirement from our implementation. + + :param dest_ip: The IP address that you want to allow admin access for + """ + self._set_accept_admin_access_to_ip(dest_ip) + self._set_drop_admin_access_to_ip(dest_ip) + + def _set_accept_admin_access_to_ip(self, dest_ip): + """Sets the "accept" IP Tables rules + + I broke this out to a separate function so that I could more easily + test it in the tests/common/osutil/test_default.py code + + :param dest_ip: + :return: + """ + # This allows root to access dest_ip + rm_old = "iptables -D OUTPUT -d {0} -j ACCEPT" + rule = "iptables -A OUTPUT -d {0} -j ACCEPT" + shellutil.run(rm_old.format(dest_ip), chk_err=False) + shellutil.run(rule.format(dest_ip)) + + def _set_drop_admin_access_to_ip(self, dest_ip): + """Sets the "drop" IP Tables rules + + I broke this out to a separate function so that I could more easily + test it in the tests/common/osutil/test_default.py code + + :param dest_ip: + :return: + """ + # This blocks all other users to access dest_ip + rm_old = "iptables -D OUTPUT -d {0} -j DROP" + rule = "iptables -A OUTPUT -d {0} -j DROP" + shellutil.run(rm_old.format(dest_ip), chk_err=False) + shellutil.run(rule.format(dest_ip)) + + def get_first_if(self): + """Return the interface name, and ip addr of the management interface. + + We need to add a struct_size check here because, curiously, our 64bit + platform is identified by python in Azure(Stack) as 32 bit and without + adjusting the struct_size, we can't get the information we need. + + I believe this may be caused by only python i686 being shipped with + BIG-IP instead of python x86_64?? + """ + iface = '' + expected = 16 # how many devices should I expect... + + python_arc = platform.architecture()[0] + if python_arc == '64bit': + struct_size = 40 # for 64bit the size is 40 bytes + else: + struct_size = 32 # for 32bit the size is 32 bytes + sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + buff = array.array('B', b'\0' * (expected * struct_size)) + param = struct.pack('iL', + expected*struct_size, + buff.buffer_info()[0]) + ret = fcntl.ioctl(sock.fileno(), 0x8912, param) + retsize = (struct.unpack('iL', ret)[0]) + if retsize == (expected * struct_size): + logger.warn(('SIOCGIFCONF returned more than {0} up ' + 'network interfaces.'), expected) + sock = buff.tostring() + for i in range(0, struct_size * expected, struct_size): + iface = self._format_single_interface_name(sock, i) + + # Azure public was returning "lo:1" when deploying WAF + if b'lo' in iface: + continue + else: + break + return iface.decode('latin-1'), socket.inet_ntoa(sock[i+20:i+24]) + + def _format_single_interface_name(self, sock, offset): + return sock[offset:offset+16].split(b'\0', 1)[0] + + def route_add(self, net, mask, gateway): + """Add specified route using tmsh. + + :param net: + :param mask: + :param gateway: + :return: + """ + cmd = ("/usr/bin/tmsh create net route " + "{0}/{1} gw {2}").format(net, mask, gateway) + return shellutil.run(cmd, chk_err=False) + + def device_for_ide_port(self, port_id): + """Return device name attached to ide port 'n'. + + Include a wait in here because BIG-IP may not have yet initialized + this list of devices. + + :param port_id: + :return: + """ + for retries in range(1, 100): + # Retry until devices are ready + if os.path.exists("/sys/bus/vmbus/devices/"): + break + else: + time.sleep(10) + return super(BigIpOSUtil, self).device_for_ide_port(port_id) diff --git a/azurelinuxagent/common/osutil/clearlinux.py b/azurelinuxagent/common/osutil/clearlinux.py new file mode 100644 index 0000000..5116ff4 --- /dev/null +++ b/azurelinuxagent/common/osutil/clearlinux.py @@ -0,0 +1,91 @@ +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import os +import re +import pwd +import shutil +import socket +import array +import struct +import fcntl +import time +import base64 +import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.logger as logger +import azurelinuxagent.common.utils.fileutil as fileutil +import azurelinuxagent.common.utils.shellutil as shellutil +import azurelinuxagent.common.utils.textutil as textutil +from azurelinuxagent.common.osutil.default import DefaultOSUtil + +class ClearLinuxUtil(DefaultOSUtil): + def __init__(self): + super(ClearLinuxUtil, self).__init__() + self.agent_conf_file_path = '/usr/share/defaults/waagent/waagent.conf' + + def is_dhcp_enabled(self): + return True + + def start_network(self) : + return shellutil.run("systemctl start systemd-networkd", chk_err=False) + + def restart_if(self, iface): + shellutil.run("systemctl restart systemd-networkd") + + def restart_ssh_service(self): + # SSH is socket activated. No need to restart it. + pass + + def stop_dhcp_service(self): + return shellutil.run("systemctl stop systemd-networkd", chk_err=False) + + def start_dhcp_service(self): + return shellutil.run("systemctl start systemd-networkd", chk_err=False) + + def start_agent_service(self): + return shellutil.run("systemctl start waagent", chk_err=False) + + def stop_agent_service(self): + return shellutil.run("systemctl stop waagent", chk_err=False) + + def get_dhcp_pid(self): + ret= shellutil.run_get_output("pidof systemd-networkd") + return ret[1] if ret[0] == 0 else None + + def conf_sshd(self, disable_password): + # Don't whack the system default sshd conf + pass + + def del_root_password(self): + try: + passwd_file_path = conf.get_passwd_file_path() + try: + passwd_content = fileutil.read_file(passwd_file_path) + if not passwd_content: + # Empty file is no better than no file + raise FileNotFoundError + except FileNotFoundError: + new_passwd = ["root:*LOCK*:14600::::::"] + else: + passwd = passwd_content.split('\n') + new_passwd = [x for x in passwd if not x.startswith("root:")] + new_passwd.insert(0, "root:*LOCK*:14600::::::") + fileutil.write_file(passwd_file_path, "\n".join(new_passwd)) + except IOError as e: + raise OSUtilError("Failed to delete root password:{0}".format(e)) + pass diff --git a/azurelinuxagent/common/osutil/coreos.py b/azurelinuxagent/common/osutil/coreos.py index e26fd97..9d4f9b8 100644 --- a/azurelinuxagent/common/osutil/coreos.py +++ b/azurelinuxagent/common/osutil/coreos.py @@ -17,27 +17,15 @@ # import os -import re -import pwd -import shutil -import socket -import array -import struct -import fcntl -import time -import base64 -import azurelinuxagent.common.logger as logger -import azurelinuxagent.common.utils.fileutil as fileutil import azurelinuxagent.common.utils.shellutil as shellutil -import azurelinuxagent.common.utils.textutil as textutil from azurelinuxagent.common.osutil.default import DefaultOSUtil class CoreOSUtil(DefaultOSUtil): def __init__(self): super(CoreOSUtil, self).__init__() self.agent_conf_file_path = '/usr/share/oem/waagent.conf' - self.waagent_path='/usr/share/oem/bin/waagent' - self.python_path='/usr/share/oem/python/bin' + self.waagent_path = '/usr/share/oem/bin/waagent' + self.python_path = '/usr/share/oem/python/bin' if 'PATH' in os.environ: path = "{0}:{1}".format(os.environ['PATH'], self.python_path) else: @@ -52,22 +40,22 @@ class CoreOSUtil(DefaultOSUtil): os.environ['PYTHONPATH'] = py_path def is_sys_user(self, username): - #User 'core' is not a sysuser - if username == 'core': - return False - return super(CoreOSUtil, self).is_sys_user(username) + # User 'core' is not a sysuser. + if username == 'core': + return False + return super(CoreOSUtil, self).is_sys_user(username) def is_dhcp_enabled(self): return True - def start_network(self) : + def start_network(self): return shellutil.run("systemctl start systemd-networkd", chk_err=False) - def restart_if(self, iface): + def restart_if(self, *dummy, **_): shellutil.run("systemctl restart systemd-networkd") def restart_ssh_service(self): - # SSH is socket activated on CoreOS. No need to restart it. + # SSH is socket activated on CoreOS. No need to restart it. pass def stop_dhcp_service(self): @@ -77,16 +65,17 @@ class CoreOSUtil(DefaultOSUtil): return shellutil.run("systemctl start systemd-networkd", chk_err=False) def start_agent_service(self): - return shellutil.run("systemctl start wagent", chk_err=False) + return shellutil.run("systemctl start waagent", chk_err=False) def stop_agent_service(self): - return shellutil.run("systemctl stop wagent", chk_err=False) + return shellutil.run("systemctl stop waagent", chk_err=False) def get_dhcp_pid(self): - ret= shellutil.run_get_output("pidof systemd-networkd") - return ret[1] if ret[0] == 0 else None + ret = shellutil.run_get_output("systemctl show -p MainPID " + "systemd-networkd", chk_err=False) + pid = ret[1].split('=', 1)[-1].strip() if ret[0] == 0 else None + return pid if pid != '0' else None def conf_sshd(self, disable_password): - #In CoreOS, /etc/sshd_config is mount readonly. Skip the setting + # In CoreOS, /etc/sshd_config is mount readonly. Skip the setting. pass - diff --git a/azurelinuxagent/common/osutil/default.py b/azurelinuxagent/common/osutil/default.py index c243c85..dc73379 100644 --- a/azurelinuxagent/common/osutil/default.py +++ b/azurelinuxagent/common/osutil/default.py @@ -16,6 +16,7 @@ # Requires Python 2.4+ and Openssl 1.0+ # +import multiprocessing import os import re import shutil @@ -247,16 +248,6 @@ class DefaultOSUtil(object): else: return False - def set_selinux_enforce(self, state): - """ - Calls shell command 'setenforce' with 'state' - and returns resulting exit code. - """ - if self.is_selinux_system(): - if state: s = '1' - else: s='0' - return shellutil.run("setenforce "+s) - def set_selinux_context(self, path, con): """ Calls shell 'chcon' with 'path' and 'con' context. @@ -649,7 +640,7 @@ class DefaultOSUtil(object): return shellutil.run(cmd, chk_err=False) def get_dhcp_pid(self): - ret= shellutil.run_get_output("pidof dhclient") + ret = shellutil.run_get_output("pidof dhclient", chk_err=False) return ret[1] if ret[0] == 0 else None def set_hostname(self, hostname): @@ -761,19 +752,11 @@ class DefaultOSUtil(object): return base64.b64decode(data) def get_total_mem(self): - cmd = "grep MemTotal /proc/meminfo |awk '{print $2}'" - ret = shellutil.run_get_output(cmd) - if ret[0] == 0: - return int(ret[1])/1024 - else: - raise OSUtilError("Failed to get total memory: {0}".format(ret[1])) + # Get total memory in bytes and divide by 1024**2 to get the valu in MB. + return os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024**2) def get_processor_cores(self): - ret = shellutil.run_get_output("grep 'processor.*:' /proc/cpuinfo |wc -l") - if ret[0] == 0: - return int(ret[1]) - else: - raise OSUtilError("Failed to get processor cores") + return multiprocessing.cpu_count() def set_admin_access_to_ip(self, dest_ip): #This allows root to access dest_ip diff --git a/azurelinuxagent/common/osutil/factory.py b/azurelinuxagent/common/osutil/factory.py index 5e8ae6e..2718ba1 100644 --- a/azurelinuxagent/common/osutil/factory.py +++ b/azurelinuxagent/common/osutil/factory.py @@ -21,6 +21,7 @@ from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \ DISTRO_FULL_NAME from .default import DefaultOSUtil +from .clearlinux import ClearLinuxUtil from .coreos import CoreOSUtil from .debian import DebianOSUtil from .freebsd import FreeBSDOSUtil @@ -28,9 +29,13 @@ from .redhat import RedhatOSUtil, Redhat6xOSUtil from .suse import SUSEOSUtil, SUSE11OSUtil from .ubuntu import UbuntuOSUtil, Ubuntu12OSUtil, Ubuntu14OSUtil, \ UbuntuSnappyOSUtil +from .alpine import AlpineOSUtil +from .bigip import BigIpOSUtil def get_osutil(distro_name=DISTRO_NAME, distro_version=DISTRO_VERSION, distro_full_name=DISTRO_FULL_NAME): + if distro_name == "clear linux software for intel architecture": + return ClearLinuxUtil() if distro_name == "ubuntu": if Version(distro_version) == Version("12.04") or \ Version(distro_version) == Version("12.10"): @@ -42,6 +47,10 @@ def get_osutil(distro_name=DISTRO_NAME, distro_version=DISTRO_VERSION, return UbuntuSnappyOSUtil() else: return UbuntuOSUtil() + if distro_name == "alpine": + return AlpineOSUtil() + if distro_name == "kali": + return DebianOSUtil() if distro_name == "coreos": return CoreOSUtil() if distro_name == "suse": @@ -62,8 +71,10 @@ def get_osutil(distro_name=DISTRO_NAME, distro_version=DISTRO_VERSION, return RedhatOSUtil() elif distro_name == "freebsd": return FreeBSDOSUtil() + elif distro_name == "bigip": + return BigIpOSUtil() else: - logger.warn("Unable to load distro implemetation for {0}.", distro_name) - logger.warn("Use default distro implemetation instead.") + logger.warn("Unable to load distro implementation for {0}.", distro_name) + logger.warn("Use default distro implementation instead.") return DefaultOSUtil() diff --git a/azurelinuxagent/common/osutil/freebsd.py b/azurelinuxagent/common/osutil/freebsd.py index ddf8db6..54c7452 100644 --- a/azurelinuxagent/common/osutil/freebsd.py +++ b/azurelinuxagent/common/osutil/freebsd.py @@ -22,7 +22,7 @@ import azurelinuxagent.common.utils.textutil as textutil import azurelinuxagent.common.logger as logger from azurelinuxagent.common.exception import OSUtilError from azurelinuxagent.common.osutil.default import DefaultOSUtil - +from azurelinuxagent.common.future import ustr class FreeBSDOSUtil(DefaultOSUtil): def __init__(self): @@ -118,7 +118,7 @@ class FreeBSDOSUtil(DefaultOSUtil): shellutil.run("route delete 255.255.255.255 -iface {0}".format(ifname), chk_err=False) def get_dhcp_pid(self): - ret = shellutil.run_get_output("pgrep -n dhclient") + ret = shellutil.run_get_output("pgrep -n dhclient", chk_err=False) return ret[1] if ret[0] == 0 else None def eject_dvd(self, chk_err=True): @@ -196,3 +196,50 @@ class FreeBSDOSUtil(DefaultOSUtil): logger.verbose("Interface info: ({0},{1},{2})", iface, inet, mac) return iface, inet, mac + + def device_for_ide_port(self, port_id): + """ + Return device name attached to ide port 'n'. + """ + if port_id > 3: + return None + g0 = "00000000" + if port_id > 1: + g0 = "00000001" + port_id = port_id - 2 + err, output = shellutil.run_get_output('sysctl dev.storvsc | grep pnpinfo | grep deviceid=') + if err: + return None + g1 = "000" + ustr(port_id) + g0g1 = "{0}-{1}".format(g0, g1) + """ + search 'X' from 'dev.storvsc.X.%pnpinfo: classid=32412632-86cb-44a2-9b5c-50d1417354f5 deviceid=00000000-0001-8899-0000-000000000000' + """ + cmd_search_ide = "sysctl dev.storvsc | grep pnpinfo | grep deviceid={0}".format(g0g1) + err, output = shellutil.run_get_output(cmd_search_ide) + if err: + return None + cmd_extract_id = cmd_search_ide + "|awk -F . '{print $3}'" + err, output = shellutil.run_get_output(cmd_extract_id) + """ + try to search 'blkvscX' and 'storvscX' to find device name + """ + output = output.rstrip() + cmd_search_blkvsc = "camcontrol devlist -b | grep blkvsc{0} | awk '{{print $1}}'".format(output) + err, output = shellutil.run_get_output(cmd_search_blkvsc) + if err == 0: + output = output.rstrip() + cmd_search_dev="camcontrol devlist | grep {0} | awk -F \( '{{print $2}}'|awk -F , '{{print $1}}'".format(output) + err, output = shellutil.run_get_output(cmd_search_dev) + if err == 0: + return output.rstrip() + + cmd_search_storvsc = "camcontrol devlist -b | grep storvsc{0} | awk '{{print $1}}'".format(output) + err, output = shellutil.run_get_output(cmd_search_storvsc) + if err == 0: + output = output.rstrip() + cmd_search_dev="camcontrol devlist | grep {0} | awk -F \( '{{print $2}}'|awk -F , '{{print $1}}'".format(output) + err, output = shellutil.run_get_output(cmd_search_dev) + if err == 0: + return output.rstrip() + return None diff --git a/azurelinuxagent/common/osutil/redhat.py b/azurelinuxagent/common/osutil/redhat.py index 03084b6..80370a2 100644 --- a/azurelinuxagent/common/osutil/redhat.py +++ b/azurelinuxagent/common/osutil/redhat.py @@ -69,7 +69,7 @@ class Redhat6xOSUtil(DefaultOSUtil): #Override def get_dhcp_pid(self): - ret= shellutil.run_get_output("pidof dhclient") + ret = shellutil.run_get_output("pidof dhclient", chk_err=False) return ret[1] if ret[0] == 0 else None def set_hostname(self, hostname): diff --git a/azurelinuxagent/common/osutil/suse.py b/azurelinuxagent/common/osutil/suse.py index f0ed0c0..60d6f28 100644 --- a/azurelinuxagent/common/osutil/suse.py +++ b/azurelinuxagent/common/osutil/suse.py @@ -42,7 +42,8 @@ class SUSE11OSUtil(DefaultOSUtil): shellutil.run("hostname {0}".format(hostname), chk_err=False) def get_dhcp_pid(self): - ret= shellutil.run_get_output("pidof {0}".format(self.dhclient_name)) + ret = shellutil.run_get_output("pidof {0}".format(self.dhclient_name), + chk_err=False) return ret[1] if ret[0] == 0 else None def is_dhcp_enabled(self): diff --git a/azurelinuxagent/common/osutil/ubuntu.py b/azurelinuxagent/common/osutil/ubuntu.py index 4032cf4..3c353cf 100644 --- a/azurelinuxagent/common/osutil/ubuntu.py +++ b/azurelinuxagent/common/osutil/ubuntu.py @@ -45,9 +45,9 @@ class Ubuntu12OSUtil(Ubuntu14OSUtil): def __init__(self): super(Ubuntu12OSUtil, self).__init__() - #Override + # Override def get_dhcp_pid(self): - ret= shellutil.run_get_output("pidof dhclient3") + ret = shellutil.run_get_output("pidof dhclient3", chk_err=False) return ret[1] if ret[0] == 0 else None class UbuntuOSUtil(Ubuntu14OSUtil): diff --git a/azurelinuxagent/common/protocol/hostplugin.py b/azurelinuxagent/common/protocol/hostplugin.py index 6569604..e83dd4b 100644 --- a/azurelinuxagent/common/protocol/hostplugin.py +++ b/azurelinuxagent/common/protocol/hostplugin.py @@ -22,19 +22,27 @@ from azurelinuxagent.common.utils import textutil HOST_PLUGIN_PORT = 32526 URI_FORMAT_GET_API_VERSIONS = "http://{0}:{1}/versions" +URI_FORMAT_GET_EXTENSION_ARTIFACT = "http://{0}:{1}/extensionArtifact" URI_FORMAT_PUT_VM_STATUS = "http://{0}:{1}/status" URI_FORMAT_PUT_LOG = "http://{0}:{1}/vmAgentLog" API_VERSION = "2015-09-01" - +HEADER_CONTAINER_ID = "x-ms-containerid" +HEADER_VERSION = "x-ms-version" +HEADER_HOST_CONFIG_NAME = "x-ms-host-config-name" +HEADER_ARTIFACT_LOCATION = "x-ms-artifact-location" +HEADER_ARTIFACT_MANIFEST_LOCATION = "x-ms-artifact-manifest-location" class HostPluginProtocol(object): - def __init__(self, endpoint): + def __init__(self, endpoint, container_id, role_config_name): if endpoint is None: raise ProtocolError("Host plugin endpoint not provided") self.is_initialized = False self.is_available = False self.api_versions = None self.endpoint = endpoint + self.container_id = container_id + self.role_config_name = role_config_name + self.manifest_uri = None def ensure_initialized(self): if not self.is_initialized: @@ -46,23 +54,48 @@ class HostPluginProtocol(object): def get_api_versions(self): url = URI_FORMAT_GET_API_VERSIONS.format(self.endpoint, HOST_PLUGIN_PORT) - logger.info("getting API versions at [{0}]".format(url)) + logger.verbose("getting API versions at [{0}]".format(url)) + return_val = [] try: - response = restutil.http_get(url) + headers = {HEADER_CONTAINER_ID: self.container_id} + response = restutil.http_get(url, headers) if response.status != httpclient.OK: logger.error( "get API versions returned status code [{0}]".format( response.status)) - return [] - return response.read() + else: + return_val = ustr(remove_bom(response.read()), encoding='utf-8') + except HttpError as e: logger.error("get API versions failed with [{0}]".format(e)) - return [] - def put_vm_status(self, status_blob, sas_url): + return return_val + + def get_artifact_request(self, artifact_url, artifact_manifest_url=None): + if not self.ensure_initialized(): + logger.error("host plugin channel is not available") + return + if textutil.is_str_none_or_whitespace(artifact_url): + logger.error("no extension artifact url was provided") + return + + url = URI_FORMAT_GET_EXTENSION_ARTIFACT.format(self.endpoint, + HOST_PLUGIN_PORT) + headers = {HEADER_VERSION: API_VERSION, + HEADER_CONTAINER_ID: self.container_id, + HEADER_HOST_CONFIG_NAME: self.role_config_name, + HEADER_ARTIFACT_LOCATION: artifact_url} + + if artifact_manifest_url is not None: + headers[HEADER_ARTIFACT_MANIFEST_LOCATION] = artifact_manifest_url + + return url, headers + + def put_vm_status(self, status_blob, sas_url, config_blob_type=None): """ Try to upload the VM status via the host plugin /status channel :param sas_url: the blob SAS url to pass to the host plugin + :param config_blob_type: the blob type from the extension config :type status_blob: StatusBlob """ if not self.ensure_initialized(): @@ -71,25 +104,30 @@ class HostPluginProtocol(object): if status_blob is None or status_blob.vm_status is None: logger.error("no status data was provided") return - url = URI_FORMAT_PUT_VM_STATUS.format(self.endpoint, HOST_PLUGIN_PORT) - status = textutil.b64encode(status_blob.vm_status) - headers = {"x-ms-version": API_VERSION} - blob_headers = [{'headerName': 'x-ms-version', - 'headerValue': status_blob.__storage_version__}, - {'headerName': 'x-ms-blob-type', - 'headerValue': status_blob.type}] - data = json.dumps({'requestUri': sas_url, 'headers': blob_headers, - 'content': status}, sort_keys=True) - logger.info("put VM status at [{0}]".format(url)) try: - response = restutil.http_put(url, data, headers) + url = URI_FORMAT_PUT_VM_STATUS.format(self.endpoint, HOST_PLUGIN_PORT) + logger.verbose("Posting VM status to host plugin") + status = textutil.b64encode(status_blob.data) + blob_type = status_blob.type if status_blob.type else config_blob_type + headers = {HEADER_VERSION: API_VERSION, + "Content-type": "application/json", + HEADER_CONTAINER_ID: self.container_id, + HEADER_HOST_CONFIG_NAME: self.role_config_name} + blob_headers = [{'headerName': 'x-ms-version', + 'headerValue': status_blob.__storage_version__}, + {'headerName': 'x-ms-blob-type', + 'headerValue': blob_type}] + data = json.dumps({'requestUri': sas_url, 'headers': blob_headers, + 'content': status}, sort_keys=True) + response = restutil.http_put(url, data=data, headers=headers) if response.status != httpclient.OK: - logger.error("put VM status returned status code [{0}]".format( - response.status)) - except HttpError as e: - logger.error("put VM status failed with [{0}]".format(e)) + logger.error("PUT failed [{0}]", response.status) + else: + logger.verbose("Successfully uploaded status to host plugin") + except Exception as e: + logger.error("Put VM status failed [{0}]", e) - def put_vm_log(self, content, container_id, deployment_id): + def put_vm_log(self, content): """ Try to upload the given content to the host plugin :param deployment_id: the deployment id, which is obtained from the @@ -102,18 +140,18 @@ class HostPluginProtocol(object): if not self.ensure_initialized(): logger.error("host plugin channel is not available") return - if content is None or container_id is None or deployment_id is None: + if content is None or self.goal_state.container_id is None or self.goal_state.deployment_id is None: logger.error( "invalid arguments passed: " "[{0}], [{1}], [{2}]".format( content, - container_id, - deployment_id)) + self.goal_state.container_id, + self.goal_state.deployment_id)) return url = URI_FORMAT_PUT_LOG.format(self.endpoint, HOST_PLUGIN_PORT) - headers = {"x-ms-vmagentlog-deploymentid": deployment_id, - "x-ms-vmagentlog-containerid": container_id} + headers = {"x-ms-vmagentlog-deploymentid": self.goal_state.deployment_id, + "x-ms-vmagentlog-containerid": self.goal_state.container_id} logger.info("put VM log at [{0}]".format(url)) try: response = restutil.http_put(url, content, headers) diff --git a/azurelinuxagent/common/protocol/metadata.py b/azurelinuxagent/common/protocol/metadata.py index f86f72f..c61e373 100644 --- a/azurelinuxagent/common/protocol/metadata.py +++ b/azurelinuxagent/common/protocol/metadata.py @@ -16,39 +16,42 @@ # # Requires Python 2.4+ and Openssl 1.0+ +import base64 import json -import shutil import os -import time -from azurelinuxagent.common.exception import ProtocolError, HttpError -from azurelinuxagent.common.future import httpclient, ustr +import shutil +import re import azurelinuxagent.common.conf as conf -import azurelinuxagent.common.logger as logger -import azurelinuxagent.common.utils.restutil as restutil -import azurelinuxagent.common.utils.textutil as textutil import azurelinuxagent.common.utils.fileutil as fileutil -from azurelinuxagent.common.utils.cryptutil import CryptUtil +import azurelinuxagent.common.utils.shellutil as shellutil +import azurelinuxagent.common.utils.textutil as textutil +from azurelinuxagent.common.future import httpclient from azurelinuxagent.common.protocol.restapi import * +from azurelinuxagent.common.utils.cryptutil import CryptUtil -METADATA_ENDPOINT='169.254.169.254' -APIVERSION='2015-05-01-preview' +METADATA_ENDPOINT = '169.254.169.254' +APIVERSION = '2015-05-01-preview' BASE_URI = "http://{0}/Microsoft.Compute/{1}?api-version={2}{3}" TRANSPORT_PRV_FILE_NAME = "V2TransportPrivate.pem" TRANSPORT_CERT_FILE_NAME = "V2TransportCert.pem" +P7M_FILE_NAME = "Certificates.p7m" +P7B_FILE_NAME = "Certificates.p7b" +PEM_FILE_NAME = "Certificates.pem" -#TODO remote workarround for azure stack +# TODO remote workaround for azure stack MAX_PING = 30 RETRY_PING_INTERVAL = 10 + def _add_content_type(headers): if headers is None: headers = {} headers["content-type"] = "application/json" return headers -class MetadataProtocol(Protocol): +class MetadataProtocol(Protocol): def __init__(self, apiversion=APIVERSION, endpoint=METADATA_ENDPOINT): self.apiversion = apiversion self.endpoint = endpoint @@ -65,11 +68,12 @@ class MetadataProtocol(Protocol): self.apiversion, "") self.vm_status_uri = BASE_URI.format(self.endpoint, "status/vmagent", self.apiversion, "") - self.ext_status_uri = BASE_URI.format(self.endpoint, + self.ext_status_uri = BASE_URI.format(self.endpoint, "status/extensions/{0}", self.apiversion, "") self.event_uri = BASE_URI.format(self.endpoint, "status/telemetry", self.apiversion, "") + self.certs = None def _get_data(self, url, headers=None): try: @@ -82,13 +86,12 @@ class MetadataProtocol(Protocol): data = resp.read() etag = resp.getheader('ETag') - if data is None: - return None - data = json.loads(ustr(data, encoding="utf-8")) + if data is not None: + data = json.loads(ustr(data, encoding="utf-8")) return data, etag def _put_data(self, url, data, headers=None): - headers = _add_content_type(headers) + headers = _add_content_type(headers) try: resp = restutil.http_put(url, json.dumps(data), headers=headers) except HttpError as e: @@ -97,16 +100,16 @@ class MetadataProtocol(Protocol): raise ProtocolError("{0} - PUT: {1}".format(resp.status, url)) def _post_data(self, url, data, headers=None): - headers = _add_content_type(headers) + headers = _add_content_type(headers) try: resp = restutil.http_post(url, json.dumps(data), headers=headers) except HttpError as e: raise ProtocolError(ustr(e)) if resp.status != httpclient.CREATED: raise ProtocolError("{0} - POST: {1}".format(resp.status, url)) - + def _get_trans_cert(self): - trans_crt_file = os.path.join(conf.get_lib_dir(), + trans_crt_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME) if not os.path.isfile(trans_crt_file): raise ProtocolError("{0} is missing.".format(trans_crt_file)) @@ -115,22 +118,22 @@ class MetadataProtocol(Protocol): def detect(self): self.get_vminfo() - trans_prv_file = os.path.join(conf.get_lib_dir(), + trans_prv_file = os.path.join(conf.get_lib_dir(), TRANSPORT_PRV_FILE_NAME) - trans_cert_file = os.path.join(conf.get_lib_dir(), + trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME) cryptutil = CryptUtil(conf.get_openssl_cmd()) cryptutil.gen_transport_cert(trans_prv_file, trans_cert_file) - #"Install" the cert and private key to /var/lib/waagent + # "Install" the cert and private key to /var/lib/waagent thumbprint = cryptutil.get_thumbprint_from_crt(trans_cert_file) - prv_file = os.path.join(conf.get_lib_dir(), + prv_file = os.path.join(conf.get_lib_dir(), "{0}.prv".format(thumbprint)) - crt_file = os.path.join(conf.get_lib_dir(), + crt_file = os.path.join(conf.get_lib_dir(), "{0}.crt".format(thumbprint)) shutil.copyfile(trans_prv_file, prv_file) shutil.copyfile(trans_cert_file, crt_file) - + self.update_goal_state(forced=True) def get_vminfo(self): vminfo = VMInfo() @@ -139,18 +142,42 @@ class MetadataProtocol(Protocol): return vminfo def get_certs(self): - #TODO download and save certs - return CertList() + certlist = CertList() + certificatedata = CertificateData() + data, etag = self._get_data(self.cert_uri) + + set_properties("certlist", certlist, data) + + cert_list = get_properties(certlist) + + headers = { + "x-ms-vmagent-public-x509-cert": self._get_trans_cert() + } + + for cert_i in cert_list["certificates"]: + certificate_data_uri = cert_i['certificateDataUri'] + data, etag = self._get_data(certificate_data_uri, headers=headers) + set_properties("certificatedata", certificatedata, data) + json_certificate_data = get_properties(certificatedata) + + self.certs = Certificates(self, json_certificate_data) + + if self.certs is None: + return None + return self.certs def get_vmagent_manifests(self, last_etag=None): manifests = VMAgentManifestList() + self.update_goal_state() data, etag = self._get_data(self.vmagent_uri) - if last_etag == None or last_etag < etag: - set_properties("vmAgentManifests", manifests.vmAgentManifests, data) + if last_etag is None or last_etag < etag: + set_properties("vmAgentManifests", + manifests.vmAgentManifests, + data) return manifests, etag def get_vmagent_pkgs(self, vmagent_manifest): - #Agent package is the same with extension handler + # Agent package is the same with extension handler vmagent_pkgs = ExtHandlerPackageList() data = None for manifest_uri in vmagent_manifest.versionsManifestUris: @@ -168,27 +195,35 @@ class MetadataProtocol(Protocol): return vmagent_pkgs def get_ext_handlers(self, last_etag=None): + self.update_goal_state() headers = { "x-ms-vmagent-public-x509-cert": self._get_trans_cert() } ext_list = ExtHandlerList() data, etag = self._get_data(self.ext_uri, headers=headers) - if last_etag == None or last_etag < etag: + if last_etag is None or last_etag < etag: set_properties("extensionHandlers", ext_list.extHandlers, data) return ext_list, etag def get_ext_handler_pkgs(self, ext_handler): - ext_handler_pkgs = ExtHandlerPackageList() - data = None + logger.info("Get extension handler packages") + pkg_list = ExtHandlerPackageList() + + manifest = None for version_uri in ext_handler.versionUris: try: - data, etag = self._get_data(version_uri.uri) + manifest, etag = self._get_data(version_uri.uri) + logger.info("Successfully downloaded manifest") break except ProtocolError as e: - logger.warn("Failed to get version uris: {0}", e) - logger.info("Retry getting version uris") - set_properties("extensionPackages", ext_handler_pkgs, data) - return ext_handler_pkgs + logger.warn("Failed to fetch manifest: {0}", e) + + if manifest is None: + raise ValueError("Extension manifest is empty") + + set_properties("extensionPackages", pkg_list, manifest) + + return pkg_list def report_provision_status(self, provision_status): validate_param('provisionStatus', provision_status, ProvisionStatus) @@ -198,7 +233,8 @@ class MetadataProtocol(Protocol): def report_vm_status(self, vm_status): validate_param('vmStatus', vm_status, VMStatus) data = get_properties(vm_status) - #TODO code field is not implemented for metadata protocol yet. Remove it + # TODO code field is not implemented for metadata protocol yet. + # Remove it handler_statuses = data['vmAgent']['extensionHandlers'] for handler_status in handler_statuses: try: @@ -215,9 +251,134 @@ class MetadataProtocol(Protocol): self._put_data(uri, data) def report_event(self, events): - #TODO disable telemetry for azure stack test - #validate_param('events', events, TelemetryEventList) - #data = get_properties(events) - #self._post_data(self.event_uri, data) + # TODO disable telemetry for azure stack test + # validate_param('events', events, TelemetryEventList) + # data = get_properties(events) + # self._post_data(self.event_uri, data) pass + def update_certs(self): + certificates = self.get_certs() + return certificates.cert_list + + def update_goal_state(self, forced=False, max_retry=3): + # Start updating goalstate, retry on 410 + for retry in range(0, max_retry): + try: + self.update_certs() + return + except: + logger.verbose("Incarnation is out of date. Update goalstate.") + raise ProtocolError("Exceeded max retry updating goal state") + + +class Certificates(object): + """ + Object containing certificates of host and provisioned user. + """ + + def __init__(self, client, json_text): + self.cert_list = CertList() + self.parse(json_text) + + def parse(self, json_text): + """ + Parse multiple certificates into seperate files. + """ + + data = json_text["certificateData"] + if data is None: + logger.verbose("No data in json_text received!") + return + + cryptutil = CryptUtil(conf.get_openssl_cmd()) + p7b_file = os.path.join(conf.get_lib_dir(), P7B_FILE_NAME) + + # Wrapping the certificate lines. + # decode and save the result into p7b_file + fileutil.write_file(p7b_file, base64.b64decode(data), asbin=True) + + ssl_cmd = "openssl pkcs7 -text -in {0} -inform der | grep -v '^-----' " + ret, data = shellutil.run_get_output(ssl_cmd.format(p7b_file)) + + p7m_file = os.path.join(conf.get_lib_dir(), P7M_FILE_NAME) + p7m = ("MIME-Version:1.0\n" + "Content-Disposition: attachment; filename=\"{0}\"\n" + "Content-Type: application/x-pkcs7-mime; name=\"{1}\"\n" + "Content-Transfer-Encoding: base64\n" + "\n" + "{2}").format(p7m_file, p7m_file, data) + + self.save_cache(p7m_file, p7m) + + trans_prv_file = os.path.join(conf.get_lib_dir(), + TRANSPORT_PRV_FILE_NAME) + trans_cert_file = os.path.join(conf.get_lib_dir(), + TRANSPORT_CERT_FILE_NAME) + pem_file = os.path.join(conf.get_lib_dir(), PEM_FILE_NAME) + # decrypt certificates + cryptutil.decrypt_p7m(p7m_file, trans_prv_file, trans_cert_file, + pem_file) + + # The parsing process use public key to match prv and crt. + buf = [] + begin_crt = False + begin_prv = False + prvs = {} + thumbprints = {} + index = 0 + v1_cert_list = [] + with open(pem_file) as pem: + for line in pem.readlines(): + buf.append(line) + if re.match(r'[-]+BEGIN.*KEY[-]+', line): + begin_prv = True + elif re.match(r'[-]+BEGIN.*CERTIFICATE[-]+', line): + begin_crt = True + elif re.match(r'[-]+END.*KEY[-]+', line): + tmp_file = self.write_to_tmp_file(index, 'prv', buf) + pub = cryptutil.get_pubkey_from_prv(tmp_file) + prvs[pub] = tmp_file + buf = [] + index += 1 + begin_prv = False + elif re.match(r'[-]+END.*CERTIFICATE[-]+', line): + tmp_file = self.write_to_tmp_file(index, 'crt', buf) + pub = cryptutil.get_pubkey_from_crt(tmp_file) + thumbprint = cryptutil.get_thumbprint_from_crt(tmp_file) + thumbprints[pub] = thumbprint + # Rename crt with thumbprint as the file name + crt = "{0}.crt".format(thumbprint) + v1_cert_list.append({ + "name": None, + "thumbprint": thumbprint + }) + os.rename(tmp_file, os.path.join(conf.get_lib_dir(), crt)) + buf = [] + index += 1 + begin_crt = False + + # Rename prv key with thumbprint as the file name + for pubkey in prvs: + thumbprint = thumbprints[pubkey] + if thumbprint: + tmp_file = prvs[pubkey] + prv = "{0}.prv".format(thumbprint) + os.rename(tmp_file, os.path.join(conf.get_lib_dir(), prv)) + + for v1_cert in v1_cert_list: + cert = Cert() + set_properties("certs", cert, v1_cert) + self.cert_list.certificates.append(cert) + + def save_cache(self, local_file, data): + try: + fileutil.write_file(local_file, data) + except IOError as e: + raise ProtocolError("Failed to write cache: {0}".format(e)) + + def write_to_tmp_file(self, index, suffix, buf): + file_name = os.path.join(conf.get_lib_dir(), + "{0}.{1}".format(index, suffix)) + self.save_cache(file_name, "".join(buf)) + return file_name diff --git a/azurelinuxagent/common/protocol/restapi.py b/azurelinuxagent/common/protocol/restapi.py index 7f00488..5f71cf2 100644 --- a/azurelinuxagent/common/protocol/restapi.py +++ b/azurelinuxagent/common/protocol/restapi.py @@ -16,15 +16,12 @@ # # Requires Python 2.4+ and Openssl 1.0+ # -import os -import copy -import re -import json -import xml.dom.minidom + import azurelinuxagent.common.logger as logger +import azurelinuxagent.common.utils.restutil as restutil from azurelinuxagent.common.exception import ProtocolError, HttpError from azurelinuxagent.common.future import ustr -import azurelinuxagent.common.utils.restutil as restutil + def validate_param(name, val, expected_type): if val is None: @@ -33,13 +30,14 @@ def validate_param(name, val, expected_type): raise ProtocolError(("{0} type should be {1} not {2}" "").format(name, expected_type, type(val))) + def set_properties(name, obj, data): if isinstance(obj, DataContract): validate_param("Property '{0}'".format(name), data, dict) for prob_name, prob_val in data.items(): prob_full_name = "{0}.{1}".format(name, prob_name) try: - prob = getattr(obj, prob_name) + prob = getattr(obj, prob_name) except AttributeError: logger.warn("Unknown property: {0}", prob_full_name) continue @@ -56,6 +54,7 @@ def set_properties(name, obj, data): else: return data + def get_properties(obj): if isinstance(obj, DataContract): data = {} @@ -72,16 +71,21 @@ def get_properties(obj): else: return obj + class DataContract(object): pass + class DataContractList(list): def __init__(self, item_cls): self.item_cls = item_cls + """ Data contract between guest and host """ + + class VMInfo(DataContract): def __init__(self, subscriptionId=None, vmName=None, containerId=None, roleName=None, roleInstanceName=None, tenantName=None): @@ -92,30 +96,40 @@ class VMInfo(DataContract): self.roleInstanceName = roleInstanceName self.tenantName = tenantName +class CertificateData(DataContract): + def __init__(self, certificateData=None): + self.certificateData = certificateData + class Cert(DataContract): - def __init__(self, name=None, thumbprint=None, certificateDataUri=None): + def __init__(self, name=None, thumbprint=None, certificateDataUri=None, storeName=None, storeLocation=None): self.name = name self.thumbprint = thumbprint self.certificateDataUri = certificateDataUri + self.storeLocation = storeLocation + self.storeName = storeName class CertList(DataContract): def __init__(self): self.certificates = DataContractList(Cert) -#TODO: confirm vmagent manifest schema + +# TODO: confirm vmagent manifest schema class VMAgentManifestUri(DataContract): def __init__(self, uri=None): self.uri = uri + class VMAgentManifest(DataContract): def __init__(self, family=None): self.family = family self.versionsManifestUris = DataContractList(VMAgentManifestUri) + class VMAgentManifestList(DataContract): def __init__(self): self.vmAgentManifests = DataContractList(VMAgentManifest) + class Extension(DataContract): def __init__(self, name=None, sequenceNumber=None, publicSettings=None, protectedSettings=None, certificateThumbprint=None): @@ -125,6 +139,7 @@ class Extension(DataContract): self.protectedSettings = protectedSettings self.certificateThumbprint = certificateThumbprint + class ExtHandlerProperties(DataContract): def __init__(self): self.version = None @@ -132,40 +147,49 @@ class ExtHandlerProperties(DataContract): self.state = None self.extensions = DataContractList(Extension) + class ExtHandlerVersionUri(DataContract): def __init__(self): self.uri = None + class ExtHandler(DataContract): def __init__(self, name=None): self.name = name self.properties = ExtHandlerProperties() self.versionUris = DataContractList(ExtHandlerVersionUri) + class ExtHandlerList(DataContract): def __init__(self): self.extHandlers = DataContractList(ExtHandler) + class ExtHandlerPackageUri(DataContract): def __init__(self, uri=None): self.uri = uri + class ExtHandlerPackage(DataContract): - def __init__(self, version = None): + def __init__(self, version=None): self.version = version self.uris = DataContractList(ExtHandlerPackageUri) # TODO update the naming to align with metadata protocol self.isinternal = False + self.disallow_major_upgrade = False + class ExtHandlerPackageList(DataContract): def __init__(self): self.versions = DataContractList(ExtHandlerPackage) + class VMProperties(DataContract): def __init__(self, certificateThumbprint=None): - #TODO need to confirm the property name + # TODO need to confirm the property name self.certificateThumbprint = certificateThumbprint + class ProvisionStatus(DataContract): def __init__(self, status=None, subStatus=None, description=None): self.status = status @@ -173,6 +197,7 @@ class ProvisionStatus(DataContract): self.description = description self.properties = VMProperties() + class ExtensionSubStatus(DataContract): def __init__(self, name=None, status=None, code=None, message=None): self.name = name @@ -180,6 +205,7 @@ class ExtensionSubStatus(DataContract): self.code = code self.message = message + class ExtensionStatus(DataContract): def __init__(self, configurationAppliedTime=None, operation=None, status=None, seq_no=None, code=None, message=None): @@ -191,8 +217,9 @@ class ExtensionStatus(DataContract): self.message = message self.substatusList = DataContractList(ExtensionSubStatus) + class ExtHandlerStatus(DataContract): - def __init__(self, name=None, version=None, status=None, code=0, + def __init__(self, name=None, version=None, status=None, code=0, message=None): self.name = name self.version = version @@ -201,6 +228,7 @@ class ExtHandlerStatus(DataContract): self.message = message self.extensions = DataContractList(ustr) + class VMAgentStatus(DataContract): def __init__(self, version=None, status=None, message=None): self.version = version @@ -208,27 +236,31 @@ class VMAgentStatus(DataContract): self.message = message self.extensionHandlers = DataContractList(ExtHandlerStatus) + class VMStatus(DataContract): def __init__(self): self.vmAgent = VMAgentStatus() + class TelemetryEventParam(DataContract): def __init__(self, name=None, value=None): self.name = name self.value = value + class TelemetryEvent(DataContract): def __init__(self, eventId=None, providerId=None): self.eventId = eventId self.providerId = providerId self.parameters = DataContractList(TelemetryEventParam) + class TelemetryEventList(DataContract): def __init__(self): self.events = DataContractList(TelemetryEvent) -class Protocol(DataContract): +class Protocol(DataContract): def detect(self): raise NotImplementedError() @@ -240,8 +272,8 @@ class Protocol(DataContract): def get_vmagent_manifests(self): raise NotImplementedError() - - def get_vmagent_pkgs(self): + + def get_vmagent_pkgs(self, manifest): raise NotImplementedError() def get_ext_handlers(self): @@ -250,13 +282,16 @@ class Protocol(DataContract): def get_ext_handler_pkgs(self, extension): raise NotImplementedError() - def download_ext_handler_pkg(self, uri): + def get_artifacts_profile(self): + raise NotImplementedError() + + def download_ext_handler_pkg(self, uri, headers=None): try: - resp = restutil.http_get(uri, chk_proxy=True) + resp = restutil.http_get(uri, chk_proxy=True, headers=headers) if resp.status == restutil.httpclient.OK: return resp.read() - except HttpError as e: - raise ProtocolError("Failed to download from: {0}".format(uri), e) + except Exception as e: + logger.warn("Failed to download from: {0}".format(uri), e) def report_provision_status(self, provision_status): raise NotImplementedError() @@ -269,4 +304,3 @@ class Protocol(DataContract): def report_event(self, event): raise NotImplementedError() - diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py index 29a1663..9f634e9 100644 --- a/azurelinuxagent/common/protocol/wire.py +++ b/azurelinuxagent/common/protocol/wire.py @@ -16,14 +16,18 @@ # # Requires Python 2.4+ and Openssl 1.0+ +import json +import os +import re import time import xml.sax.saxutils as saxutils import azurelinuxagent.common.conf as conf from azurelinuxagent.common.exception import ProtocolNotFoundError from azurelinuxagent.common.future import httpclient, bytebuffer -from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, findtext, \ - getattrib, gettext, remove_bom, get_bytes_from_pem +from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \ + findtext, getattrib, gettext, remove_bom, get_bytes_from_pem, parse_json import azurelinuxagent.common.utils.fileutil as fileutil +import azurelinuxagent.common.utils.textutil as textutil from azurelinuxagent.common.utils.cryptutil import CryptUtil from azurelinuxagent.common.protocol.restapi import * from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol @@ -66,10 +70,13 @@ class WireProtocol(Protocol): """Slim layer to adapt wire protocol data to metadata protocol interface""" # TODO: Clean-up goal state processing - # At present, some methods magically update GoalState (e.g., get_vmagent_manifests), others (e.g., get_vmagent_pkgs) - # assume its presence. A better approach would make an explicit update call that returns the incarnation number and - # establishes that number the "context" for all other calls (either by updating the internal state of the protocol or - # by having callers pass the incarnation number to the method). + # At present, some methods magically update GoalState (e.g., + # get_vmagent_manifests), others (e.g., get_vmagent_pkgs) + # assume its presence. A better approach would make an explicit update + # call that returns the incarnation number and + # establishes that number the "context" for all other calls (either by + # updating the internal state of the protocol or + # by having callers pass the incarnation number to the method). def __init__(self, endpoint): if endpoint is None: @@ -133,6 +140,22 @@ class WireProtocol(Protocol): man = self.client.get_ext_manifest(ext_handler, goal_state) return man.pkg_list + def get_artifacts_profile(self): + logger.verbose("Get In-VM Artifacts Profile") + return self.client.get_artifacts_profile() + + def download_ext_handler_pkg(self, uri, headers=None): + package = super(WireProtocol, self).download_ext_handler_pkg(uri) + + if package is not None: + return package + else: + logger.warn("Download did not succeed, falling back to host plugin") + host = self.client.get_host_plugin() + uri, headers = host.get_artifact_request(uri, host.manifest_uri) + package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers) + return package + def report_provision_status(self, provision_status): validate_param("provision_status", provision_status, ProvisionStatus) @@ -269,7 +292,7 @@ def ext_status_to_v1(ext_name, ext_status): "timestampUTC": timestamp } if len(v1_sub_status) != 0: - v1_ext_status['substatus'] = v1_sub_status + v1_ext_status['status']['substatus'] = v1_sub_status return v1_ext_status @@ -348,8 +371,8 @@ class StatusBlob(object): # TODO upload extension only if content has changed logger.verbose("Upload status blob") upload_successful = False - self.type = self.get_blob_type(url) self.data = self.to_json() + self.type = self.get_blob_type(url) try: if self.type == "BlockBlob": self.put_block_blob(url, self.data) @@ -365,41 +388,45 @@ class StatusBlob(object): return upload_successful def get_blob_type(self, url): - # Check blob type - logger.verbose("Check blob type.") + logger.verbose("Get blob type") timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) try: - resp = self.client.call_storage_service(restutil.http_head, url, { - "x-ms-date": timestamp, - 'x-ms-version': self.__class__.__storage_version__ - }) + resp = self.client.call_storage_service( + restutil.http_head, + url, + { + "x-ms-date": timestamp, + "x-ms-version": self.__class__.__storage_version__ + }) except HttpError as e: - raise ProtocolError((u"Failed to get status blob type: {0}" - u"").format(e)) + raise ProtocolError("Failed to get status blob type: {0}", e) + if resp is None or resp.status != httpclient.OK: - raise ProtocolError(("Failed to get status blob type: {0}" - "").format(resp.status)) + raise ProtocolError("Failed to get status blob type") blob_type = resp.getheader("x-ms-blob-type") - logger.verbose("Blob type={0}".format(blob_type)) + logger.verbose("Blob type: [{0}]", blob_type) return blob_type def put_block_blob(self, url, data): - logger.verbose("Upload block blob") + logger.verbose("Put block blob") timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - resp = self.client.call_storage_service(restutil.http_put, url, data, - { - "x-ms-date": timestamp, - "x-ms-blob-type": "BlockBlob", - "Content-Length": ustr(len(data)), - "x-ms-version": self.__class__.__storage_version__ - }) + resp = self.client.call_storage_service( + restutil.http_put, + url, + data, + { + "x-ms-date": timestamp, + "x-ms-blob-type": "BlockBlob", + "Content-Length": ustr(len(data)), + "x-ms-version": self.__class__.__storage_version__ + }) if resp.status != httpclient.CREATED: raise UploadError( "Failed to upload block blob: {0}".format(resp.status)) def put_page_blob(self, url, data): - logger.verbose("Replace old page blob") + logger.verbose("Put page blob") # Convert string into bytes data = bytearray(data, encoding='utf-8') @@ -407,14 +434,17 @@ class StatusBlob(object): # Align to 512 bytes page_blob_size = int((len(data) + 511) / 512) * 512 - resp = self.client.call_storage_service(restutil.http_put, url, "", - { - "x-ms-date": timestamp, - "x-ms-blob-type": "PageBlob", - "Content-Length": "0", - "x-ms-blob-content-length": ustr(page_blob_size), - "x-ms-version": self.__class__.__storage_version__ - }) + resp = self.client.call_storage_service( + restutil.http_put, + url, + "", + { + "x-ms-date": timestamp, + "x-ms-blob-type": "PageBlob", + "Content-Length": "0", + "x-ms-blob-content-length": ustr(page_blob_size), + "x-ms-version": self.__class__.__storage_version__ + }) if resp.status != httpclient.CREATED: raise UploadError( "Failed to clean up page blob: {0}".format(resp.status)) @@ -437,7 +467,9 @@ class StatusBlob(object): buf = bytearray(buf_size) buf[0: content_size] = data[start: end] resp = self.client.call_storage_service( - restutil.http_put, url, bytebuffer(buf), + restutil.http_put, + url, + bytebuffer(buf), { "x-ms-date": timestamp, "x-ms-range": "bytes={0}-{1}".format(start, page_end - 1), @@ -465,7 +497,8 @@ def event_param_to_v1(param): attr_type = 'mt:bool' elif param_type is float: attr_type = 'mt:float64' - return param_format.format(param.name, saxutils.quoteattr(ustr(param.value)), + return param_format.format(param.name, + saxutils.quoteattr(ustr(param.value)), attr_type) @@ -491,8 +524,9 @@ class WireClient(object): self.ext_conf = None self.last_request = 0 self.req_count = 0 + self.host_plugin = None self.status_blob = StatusBlob(self) - self.host_plugin = HostPluginProtocol(self.endpoint) + self.status_blob_type_reported = False def prevent_throttling(self): """ @@ -501,37 +535,42 @@ class WireClient(object): now = time.time() if now - self.last_request < 1: logger.verbose("Last request issued less than 1 second ago") - logger.verbose("Sleep {0} second to avoid throttling.", - SHORT_WAITING_INTERVAL) + logger.verbose("Sleep {0} second to avoid throttling.", + SHORT_WAITING_INTERVAL) time.sleep(SHORT_WAITING_INTERVAL) self.last_request = now self.req_count += 1 if self.req_count % 3 == 0: - logger.verbose("Sleep {0} second to avoid throttling.", - SHORT_WAITING_INTERVAL) + logger.verbose("Sleep {0} second to avoid throttling.", + SHORT_WAITING_INTERVAL) time.sleep(SHORT_WAITING_INTERVAL) self.req_count = 0 def call_wireserver(self, http_req, *args, **kwargs): """ - Call wire server. Handle throttling(403) and Resource Gone(410) + Call wire server; handle throttling (403), resource gone (410) and + service unavailable (503). """ self.prevent_throttling() for retry in range(0, 3): resp = http_req(*args, **kwargs) if resp.status == httpclient.FORBIDDEN: - logger.warn("Sending too much request to wire server") - logger.info("Sleep {0} second to avoid throttling.", + logger.warn("Sending too many requests to wire server. ") + logger.info("Sleeping {0}s to avoid throttling.", LONG_WAITING_INTERVAL) time.sleep(LONG_WAITING_INTERVAL) + elif resp.status == httpclient.SERVICE_UNAVAILABLE: + logger.warn("Service temporarily unavailable, sleeping {0}s " + "before retrying.", LONG_WAITING_INTERVAL) + time.sleep(LONG_WAITING_INTERVAL) elif resp.status == httpclient.GONE: msg = args[0] if len(args) > 0 else "" raise WireProtocolResourceGone(msg) else: return resp - raise ProtocolError(("Calling wire server failed: {0}" - "").format(resp.status)) + raise ProtocolError(("Calling wire server failed: " + "{0}").format(resp.status)) def decode_config(self, data): if data is None: @@ -542,12 +581,13 @@ class WireClient(object): def fetch_config(self, uri, headers): try: - resp = self.call_wireserver(restutil.http_get, uri, + resp = self.call_wireserver(restutil.http_get, + uri, headers=headers) except HttpError as e: raise ProtocolError(ustr(e)) - if (resp.status != httpclient.OK): + if resp.status != httpclient.OK: raise ProtocolError("{0} - {1}".format(resp.status, uri)) return self.decode_config(resp.read()) @@ -566,41 +606,65 @@ class WireClient(object): except IOError as e: raise ProtocolError("Failed to write cache: {0}".format(e)) - def call_storage_service(self, http_req, *args, **kwargs): + @staticmethod + def call_storage_service(http_req, *args, **kwargs): """ Call storage service, handle SERVICE_UNAVAILABLE(503) """ + + # force the chk_proxy arg to True, since all calls to storage should + # use a configured proxy + kwargs['chk_proxy'] = True + for retry in range(0, 3): resp = http_req(*args, **kwargs) if resp.status == httpclient.SERVICE_UNAVAILABLE: - logger.warn("Storage service is not avaible temporaryly") - logger.info("Will retry later, in {0} seconds", + logger.warn("Storage service is temporarily unavailable. ") + logger.info("Will retry in {0} seconds. ", LONG_WAITING_INTERVAL) time.sleep(LONG_WAITING_INTERVAL) else: return resp - raise ProtocolError(("Calling storage endpoint failed: {0}" - "").format(resp.status)) + raise ProtocolError(("Calling storage endpoint failed: " + "{0}").format(resp.status)) def fetch_manifest(self, version_uris): - for version_uri in version_uris: - logger.verbose("Fetch ext handler manifest: {0}", version_uri.uri) - try: - resp = self.call_storage_service(restutil.http_get, - version_uri.uri, None, - chk_proxy=True) - except HttpError as e: - raise ProtocolError(ustr(e)) - + logger.verbose("Fetch manifest") + for version in version_uris: + response = self.fetch(version.uri) + if not response: + logger.verbose("Manifest could not be downloaded, falling back to host plugin") + host = self.get_host_plugin() + uri, headers = host.get_artifact_request(version.uri) + response = self.fetch(uri, headers) + if not response: + logger.info("Retry fetch in {0} seconds", + LONG_WAITING_INTERVAL) + time.sleep(LONG_WAITING_INTERVAL) + else: + host.manifest_uri = version.uri + logger.verbose("Manifest downloaded successfully from host plugin") + if response: + return response + raise ProtocolError("Failed to fetch manifest from all sources") + + def fetch(self, uri, headers=None): + logger.verbose("Fetch [{0}] with headers [{1}]", uri, headers) + return_value = None + try: + resp = self.call_storage_service( + restutil.http_get, + uri, + headers) if resp.status == httpclient.OK: - return self.decode_config(resp.read()) - logger.warn("Failed to fetch ExtensionManifest: {0}, {1}", - resp.status, version_uri.uri) - logger.info("Will retry later, in {0} seconds", - LONG_WAITING_INTERVAL) - time.sleep(LONG_WAITING_INTERVAL) - raise ProtocolError(("Failed to fetch ExtensionManifest from " - "all sources")) + return_value = self.decode_config(resp.read()) + else: + logger.warn("Could not fetch {0} [{1}]", + uri, + resp.status) + except (HttpError, ProtocolError) as e: + logger.verbose("Fetch failed from [{0}]", uri) + return return_value def update_hosting_env(self, goal_state): if goal_state.hosting_env_uri is None: @@ -640,6 +704,7 @@ class WireClient(object): xml_text = self.fetch_config(goal_state.ext_uri, self.get_header()) self.save_cache(local_file, xml_text) self.ext_conf = ExtensionsConfig(xml_text) + self.status_blob_type_reported = False def update_goal_state(self, forced=False, max_retry=3): uri = GOAL_STATE_URI.format(self.endpoint) @@ -671,6 +736,9 @@ class WireClient(object): self.update_shared_conf(goal_state) self.update_certs(goal_state) self.update_ext_conf(goal_state) + if self.host_plugin is not None: + self.host_plugin.container_id = goal_state.container_id + self.host_plugin.role_config_name = goal_state.role_config_name return except WireProtocolResourceGone: logger.info("Incarnation is out of date. Update goalstate.") @@ -680,7 +748,7 @@ class WireClient(object): raise ProtocolError("Exceeded max retry updating goal state") def get_goal_state(self): - if (self.goal_state is None): + if self.goal_state is None: incarnation_file = os.path.join(conf.get_lib_dir(), INCARNATION_FILE_NAME) incarnation = self.fetch_cache(incarnation_file) @@ -693,14 +761,16 @@ class WireClient(object): def get_hosting_env(self): if (self.hosting_env is None): - local_file = os.path.join(conf.get_lib_dir(), HOSTING_ENV_FILE_NAME) + local_file = os.path.join(conf.get_lib_dir(), + HOSTING_ENV_FILE_NAME) xml_text = self.fetch_cache(local_file) self.hosting_env = HostingEnv(xml_text) return self.hosting_env def get_shared_conf(self): if (self.shared_conf is None): - local_file = os.path.join(conf.get_lib_dir(), SHARED_CONF_FILE_NAME) + local_file = os.path.join(conf.get_lib_dir(), + SHARED_CONF_FILE_NAME) xml_text = self.fetch_cache(local_file) self.shared_conf = SharedConfig(xml_text) return self.shared_conf @@ -715,7 +785,7 @@ class WireClient(object): return self.certs def get_ext_conf(self): - if (self.ext_conf is None): + if self.ext_conf is None: goal_state = self.get_goal_state() if goal_state.ext_uri is None: self.ext_conf = ExtensionsConfig(None) @@ -724,6 +794,7 @@ class WireClient(object): local_file = os.path.join(conf.get_lib_dir(), local_file) xml_text = self.fetch_cache(local_file) self.ext_conf = ExtensionsConfig(xml_text) + self.status_blob_type_reported = False return self.ext_conf def get_ext_manifest(self, ext_handler, goal_state): @@ -761,9 +832,42 @@ class WireClient(object): def upload_status_blob(self): ext_conf = self.get_ext_conf() if ext_conf.status_upload_blob is not None: - if not self.status_blob.upload(ext_conf.status_upload_blob): - self.host_plugin.put_vm_status(self.status_blob, - ext_conf.status_upload_blob) + uploaded = False + try: + uploaded = self.status_blob.upload(ext_conf.status_upload_blob) + self.report_blob_type(self.status_blob.type, + ext_conf.status_upload_blob_type) + except (HttpError, ProtocolError) as e: + # errors have already been logged + pass + if not uploaded: + host = self.get_host_plugin() + host.put_vm_status(self.status_blob, + ext_conf.status_upload_blob, + ext_conf.status_upload_blob_type) + + """ + Emit an event to determine if the type in the extension config + matches the actual type from the HTTP HEAD request. + """ + def report_blob_type(self, head_type, config_type): + if head_type and config_type: + is_match = head_type == config_type + if self.status_blob_type_reported is False: + message = \ + 'Blob type match [{0}]'.format(head_type) if is_match else \ + 'Blob type mismatch [HEAD {0}], [CONFIG {1}]'.format( + head_type, + config_type) + + from azurelinuxagent.common.event import add_event, WALAEventOperation + from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION + add_event(AGENT_NAME, + version=CURRENT_VERSION, + is_success=is_match, + message=message, + op=WALAEventOperation.HealthCheck) + self.status_blob_type_reported = True def report_role_prop(self, thumbprint): goal_state = self.get_goal_state() @@ -774,14 +878,17 @@ class WireClient(object): role_prop_uri = ROLE_PROP_URI.format(self.endpoint) headers = self.get_header_for_xml_content() try: - resp = self.call_wireserver(restutil.http_post, role_prop_uri, - role_prop, headers=headers) + resp = self.call_wireserver(restutil.http_post, + role_prop_uri, + role_prop, + headers=headers) except HttpError as e: - raise ProtocolError((u"Failed to send role properties: {0}" - u"").format(e)) + raise ProtocolError((u"Failed to send role properties: " + u"{0}").format(e)) if resp.status != httpclient.ACCEPTED: - raise ProtocolError((u"Failed to send role properties: {0}" - u", {1}").format(resp.status, resp.read())) + raise ProtocolError((u"Failed to send role properties: " + u",{0}: {1}").format(resp.status, + resp.read())) def report_health(self, status, substatus, description): goal_state = self.get_goal_state() @@ -795,14 +902,21 @@ class WireClient(object): health_report_uri = HEALTH_REPORT_URI.format(self.endpoint) headers = self.get_header_for_xml_content() try: - resp = self.call_wireserver(restutil.http_post, health_report_uri, - health_report, headers=headers, max_retry=8) + # 30 retries with 10s sleep gives ~5min for wireserver updates; + # this is retried 3 times with 15s sleep before throwing a + # ProtocolError, for a total of ~15min. + resp = self.call_wireserver(restutil.http_post, + health_report_uri, + health_report, + headers=headers, + max_retry=30) except HttpError as e: - raise ProtocolError((u"Failed to send provision status: {0}" - u"").format(e)) + raise ProtocolError((u"Failed to send provision status: " + u"{0}").format(e)) if resp.status != httpclient.OK: - raise ProtocolError((u"Failed to send provision status: {0}" - u", {1}").format(resp.status, resp.read())) + raise ProtocolError((u"Failed to send provision status: " + u",{0}: {1}").format(resp.status, + resp.read())) def send_event(self, provider_id, event_str): uri = TELEMETRY_URI.format(self.endpoint) @@ -820,7 +934,8 @@ class WireClient(object): if resp.status != httpclient.OK: logger.verbose(resp.read()) - raise ProtocolError("Failed to send events:{0}".format(resp.status)) + raise ProtocolError( + "Failed to send events:{0}".format(resp.status)) def report_event(self, event_list): buf = {} @@ -867,6 +982,38 @@ class WireClient(object): "x-ms-guest-agent-public-x509-cert": cert } + def get_host_plugin(self): + if self.host_plugin is None: + goal_state = self.get_goal_state() + self.host_plugin = HostPluginProtocol(self.endpoint, + goal_state.container_id, + goal_state.role_config_name) + return self.host_plugin + + def has_artifacts_profile_blob(self): + return self.ext_conf and not \ + textutil.is_str_none_or_whitespace(self.ext_conf.artifacts_profile_blob) + + def get_artifacts_profile(self): + artifacts_profile = None + if self.has_artifacts_profile_blob(): + blob = self.ext_conf.artifacts_profile_blob + logger.info("Getting the artifacts profile") + profile = self.fetch(blob) + + if profile is None: + logger.warn("Download failed, falling back to host plugin") + host = self.get_host_plugin() + uri, headers = host.get_artifact_request(blob) + profile = self.decode_config(self.fetch(uri, headers)) + + if not textutil.is_str_none_or_whitespace(profile): + logger.info("Artifacts profile downloaded successfully") + artifacts_profile = InVMArtifactsProfile(profile) + + return artifacts_profile + + class VersionInfo(object): def __init__(self, xml_text): """ @@ -880,14 +1027,16 @@ class VersionInfo(object): xml_doc = parse_doc(xml_text) preferred = find(xml_doc, "Preferred") self.preferred = findtext(preferred, "Version") - logger.info("Fabric preferred wire protocol version:{0}", self.preferred) + logger.info("Fabric preferred wire protocol version:{0}", + self.preferred) self.supported = [] supported = find(xml_doc, "Supported") supported_version = findall(supported, "Version") for node in supported_version: version = gettext(node) - logger.verbose("Fabric supported wire protocol version:{0}", version) + logger.verbose("Fabric supported wire protocol version:{0}", + version) self.supported.append(version) def get_preferred(self): @@ -909,8 +1058,10 @@ class GoalState(object): self.certs_uri = None self.ext_uri = None self.role_instance_id = None + self.role_config_name = None self.container_id = None self.load_balancer_probe_port = None + self.xml_text = None self.parse(xml_text) def parse(self, xml_text): @@ -927,6 +1078,8 @@ class GoalState(object): self.ext_uri = findtext(xml_doc, "ExtensionsConfig") role_instance = find(xml_doc, "RoleInstance") self.role_instance_id = findtext(role_instance, "InstanceId") + role_config = find(role_instance, "Configuration") + self.role_config_name = findtext(role_config, "ConfigName") container = find(xml_doc, "Container") self.container_id = findtext(container, "ContainerId") lbprobe_ports = find(xml_doc, "LBProbePorts") @@ -947,6 +1100,7 @@ class HostingEnv(object): self.vm_name = None self.role_name = None self.deployment_name = None + self.xml_text = None self.parse(xml_text) def parse(self, xml_text): @@ -980,6 +1134,7 @@ class SharedConfig(object): # Not used currently return self + class Certificates(object): """ Object containing certificates of host and provisioned user. @@ -1089,6 +1244,8 @@ class ExtensionsConfig(object): self.ext_handlers = ExtHandlerList() self.vmagent_manifests = VMAgentManifestList() self.status_upload_blob = None + self.status_upload_blob_type = None + self.artifacts_profile_blob = None if xml_text is not None: self.parse(xml_text) @@ -1123,6 +1280,13 @@ class ExtensionsConfig(object): self.parse_plugin_settings(ext_handler, plugin_settings) self.status_upload_blob = findtext(xml_doc, "StatusUploadBlob") + self.artifacts_profile_blob = findtext(xml_doc, "InVMArtifactsProfileBlob") + + status_upload_node = find(xml_doc, "StatusUploadBlob") + self.status_upload_blob_type = getattrib(status_upload_node, + "statusBlobType") + logger.verbose("Extension config shows status blob type as [{0}]", + self.status_upload_blob_type) def parse_plugin(self, plugin): ext_handler = ExtHandler() @@ -1176,7 +1340,8 @@ class ExtensionsConfig(object): ext.sequenceNumber = seqNo ext.publicSettings = handler_settings.get("publicSettings") ext.protectedSettings = handler_settings.get("protectedSettings") - thumbprint = handler_settings.get("protectedSettingsCertThumbprint") + thumbprint = handler_settings.get( + "protectedSettingsCertThumbprint") ext.certificateThumbprint = thumbprint ext_handler.properties.extensions.append(ext) @@ -1191,14 +1356,21 @@ class ExtensionManifest(object): def parse(self, xml_text): xml_doc = parse_doc(xml_text) - self._handle_packages(findall(find(xml_doc, "Plugins"), "Plugin"), False) - self._handle_packages(findall(find(xml_doc, "InternalPlugins"), "Plugin"), True) + self._handle_packages(findall(find(xml_doc, + "Plugins"), + "Plugin"), + False) + self._handle_packages(findall(find(xml_doc, + "InternalPlugins"), + "Plugin"), + True) def _handle_packages(self, packages, isinternal): for package in packages: version = findtext(package, "Version") - disallow_major_upgrade = findtext(package, "DisallowMajorVersionUpgrade") + disallow_major_upgrade = findtext(package, + "DisallowMajorVersionUpgrade") if disallow_major_upgrade is None: disallow_major_upgrade = '' disallow_major_upgrade = disallow_major_upgrade.lower() == "true" @@ -1216,3 +1388,27 @@ class ExtensionManifest(object): pkg.isinternal = isinternal self.pkg_list.versions.append(pkg) + + +# Do not extend this class +class InVMArtifactsProfile(object): + """ + deserialized json string of InVMArtifactsProfile. + It is expected to contain the following fields: + * inVMArtifactsProfileBlobSeqNo + * profileId (optional) + * onHold (optional) + * certificateThumbprint (optional) + * encryptedHealthChecks (optional) + * encryptedApplicationProfile (optional) + """ + + def __init__(self, artifacts_profile): + if not textutil.is_str_none_or_whitespace(artifacts_profile): + self.__dict__.update(parse_json(artifacts_profile)) + + def is_on_hold(self): + # hasattr() is not available in Python 2.6 + if 'onHold' in self.__dict__: + return self.onHold.lower() == 'true' + return False diff --git a/azurelinuxagent/common/rdma.py b/azurelinuxagent/common/rdma.py index 0c17e38..ba9a029 100644 --- a/azurelinuxagent/common/rdma.py +++ b/azurelinuxagent/common/rdma.py @@ -86,8 +86,7 @@ class RDMAHandler(object): driver_info_source = '/var/lib/hyperv/.kvp_pool_0' base_kernel_err_msg = 'Kernel does not provide the necessary ' - base_kernel_err_msg += 'information or the hv_kvp_daemon is not ' - base_kernel_err_msg += 'running.' + base_kernel_err_msg += 'information or the kvp daemon is not running.' if not os.path.isfile(driver_info_source): error_msg = 'RDMA: Source file "%s" does not exist. ' error_msg += base_kernel_err_msg @@ -110,6 +109,26 @@ class RDMAHandler(object): logger.error(error_msg % driver_info_source) return + @staticmethod + def is_kvp_daemon_running(): + """Look for kvp daemon names in ps -ef output and return True/False + """ + # for centos, the hypervkvpd and the hv_kvp_daemon both are ok. + # for suse, it uses hv_kvp_daemon + kvp_daemon_names = ['hypervkvpd', 'hv_kvp_daemon'] + + exitcode, ps_out = shellutil.run_get_output("ps -ef") + if exitcode != 0: + raise Exception('RDMA: ps -ef failed: %s' % ps_out) + for n in kvp_daemon_names: + if n in ps_out: + logger.info('RDMA: kvp daemon (%s) is running' % n) + return True + else: + logger.verbose('RDMA: kvp daemon (%s) is not running' % n) + return False + + def load_driver_module(self): """Load the kernel driver, this depends on the proper driver to be installed with the install_driver() method""" @@ -178,12 +197,33 @@ class RDMADeviceHandler(object): threading.Thread(target=self.process).start() def process(self): - RDMADeviceHandler.wait_rdma_device( - self.rdma_dev, self.device_check_timeout_sec, self.device_check_interval_sec) - RDMADeviceHandler.update_dat_conf(dapl_config_paths, self.ipv4_addr) - RDMADeviceHandler.write_rdma_config_to_device( - self.rdma_dev, self.ipv4_addr, self.mac_addr) - RDMADeviceHandler.update_network_interface(self.mac_addr, self.ipv4_addr) + try: + RDMADeviceHandler.update_dat_conf(dapl_config_paths, self.ipv4_addr) + + skip_rdma_device = False + retcode,out = shellutil.run_get_output("modinfo hv_network_direct") + if retcode == 0: + version = re.search("version:\s+(\d+)\.(\d+)\.(\d+)\D", out, re.IGNORECASE) + if version: + v1 = int(version.groups(0)[0]) + v2 = int(version.groups(0)[1]) + if v1>4 or v1==4 and v2>0: + logger.info("Skip setting /dev/hvnd_rdma on 4.1 or later") + skip_rdma_device = True + else: + logger.info("RDMA: hv_network_direct driver version not present, assuming 4.0.x or older.") + else: + logger.warn("RDMA: failed to get module info on hv_network_direct.") + + if not skip_rdma_device: + RDMADeviceHandler.wait_rdma_device( + self.rdma_dev, self.device_check_timeout_sec, self.device_check_interval_sec) + RDMADeviceHandler.write_rdma_config_to_device( + self.rdma_dev, self.ipv4_addr, self.mac_addr) + + RDMADeviceHandler.update_network_interface(self.mac_addr, self.ipv4_addr) + except Exception as e: + logger.error("RDMA: device processing failed: {0}".format(e)) @staticmethod def update_dat_conf(paths, ipv4_addr): @@ -221,6 +261,7 @@ class RDMADeviceHandler(object): logger.info( "RDMA: Updating device with configuration: {0}".format(data)) with open(path, "w") as f: + logger.info("RDMA: Device opened for writing") f.write(data) logger.info("RDMA: Updated device with IPv4/MAC addr successfully") diff --git a/azurelinuxagent/common/utils/fileutil.py b/azurelinuxagent/common/utils/fileutil.py index 7ef4fef..b0b6fb7 100644 --- a/azurelinuxagent/common/utils/fileutil.py +++ b/azurelinuxagent/common/utils/fileutil.py @@ -21,11 +21,11 @@ File operation util functions """ +import glob import os import re import shutil import pwd -import tempfile import azurelinuxagent.common.logger as logger from azurelinuxagent.common.future import ustr import azurelinuxagent.common.utils.textutil as textutil @@ -111,9 +111,11 @@ def chmod(path, mode): os.chmod(path, mode) def rm_files(*args): - for path in args: - if os.path.isfile(path): - os.remove(path) + for paths in args: + #Find all possible file paths + for path in glob.glob(paths): + if os.path.isfile(path): + os.remove(path) def rm_dirs(*args): """ @@ -169,3 +171,12 @@ def findstr_in_file(file_path, pattern_str): return None +def get_all_files(root_path): + """ + Find all files under the given root path + """ + result = [] + for root, dirs, files in os.walk(root_path): + result.extend([os.path.join(root, file) for file in files]) + + return result diff --git a/azurelinuxagent/common/utils/restutil.py b/azurelinuxagent/common/utils/restutil.py index a789650..7c9ee17 100644 --- a/azurelinuxagent/common/utils/restutil.py +++ b/azurelinuxagent/common/utils/restutil.py @@ -18,9 +18,7 @@ # import time -import platform -import os -import subprocess + import azurelinuxagent.common.conf as conf import azurelinuxagent.common.logger as logger from azurelinuxagent.common.exception import HttpError @@ -32,6 +30,7 @@ REST api util functions RETRY_WAITING_INTERVAL = 10 + def _parse_url(url): o = urlparse(url) rel_uri = o.path @@ -44,6 +43,7 @@ def _parse_url(url): secure = True return o.hostname, o.port, secure, rel_uri + def get_http_proxy(): """ Get http_proxy and https_proxy from environment variables. @@ -51,106 +51,143 @@ def get_http_proxy(): """ host = conf.get_httpproxy_host() port = conf.get_httpproxy_port() - return (host, port) + return host, port + def _http_request(method, host, rel_uri, port=None, data=None, secure=False, - headers=None, proxy_host=None, proxy_port=None): + headers=None, proxy_host=None, proxy_port=None): url, conn = None, None if secure: port = 443 if port is None else port if proxy_host is not None and proxy_port is not None: - conn = httpclient.HTTPSConnection(proxy_host, proxy_port, timeout=10) + conn = httpclient.HTTPSConnection(proxy_host, + proxy_port, + timeout=10) conn.set_tunnel(host, port) - #If proxy is used, full url is needed. + # If proxy is used, full url is needed. url = "https://{0}:{1}{2}".format(host, port, rel_uri) else: - conn = httpclient.HTTPSConnection(host, port, timeout=10) + conn = httpclient.HTTPSConnection(host, + port, + timeout=10) url = rel_uri else: port = 80 if port is None else port if proxy_host is not None and proxy_port is not None: - conn = httpclient.HTTPConnection(proxy_host, proxy_port, timeout=10) - #If proxy is used, full url is needed. + conn = httpclient.HTTPConnection(proxy_host, + proxy_port, + timeout=10) + # If proxy is used, full url is needed. url = "http://{0}:{1}{2}".format(host, port, rel_uri) else: - conn = httpclient.HTTPConnection(host, port, timeout=10) + conn = httpclient.HTTPConnection(host, + port, + timeout=10) url = rel_uri - if headers == None: - conn.request(method, url, data) - else: - conn.request(method, url, data, headers) + + logger.verbose("HTTPConnection [{0}] [{1}] [{2}] [{3}]", + method, + url, + data, + headers) + + headers = {} if headers is None else headers + conn.request(method=method, url=url, body=data, headers=headers) resp = conn.getresponse() return resp -def http_request(method, url, data, headers=None, max_retry=3, chk_proxy=False): + +def http_request(method, url, data, headers=None, max_retry=3, + chk_proxy=False): """ Sending http request to server On error, sleep 10 and retry max_retry times. """ - logger.verbose("HTTP Req: {0} {1}", method, url) - logger.verbose(" Data={0}", data) - logger.verbose(" Header={0}", headers) host, port, secure, rel_uri = _parse_url(url) - #Check proxy + # Check proxy proxy_host, proxy_port = (None, None) if chk_proxy: proxy_host, proxy_port = get_http_proxy() - #If httplib module is not built with ssl support. Fallback to http + # If httplib module is not built with ssl support. Fallback to http if secure and not hasattr(httpclient, "HTTPSConnection"): logger.warn("httplib is not built with ssl support") secure = False - #If httplib module doesn't support https tunnelling. Fallback to http - if secure and \ - proxy_host is not None and \ - proxy_port is not None and \ - not hasattr(httpclient.HTTPSConnection, "set_tunnel"): - logger.warn("httplib doesn't support https tunnelling(new in python 2.7)") + # If httplib module doesn't support https tunnelling. Fallback to http + if secure and proxy_host is not None and proxy_port is not None \ + and not hasattr(httpclient.HTTPSConnection, "set_tunnel"): + logger.warn("httplib does not support https tunnelling " + "(new in python 2.7)") secure = False + logger.verbose("HTTP method: [{0}]", method) + logger.verbose("HTTP host: [{0}]", host) + logger.verbose("HTTP uri: [{0}]", rel_uri) + logger.verbose("HTTP port: [{0}]", port) + logger.verbose("HTTP data: [{0}]", data) + logger.verbose("HTTP secure: [{0}]", secure) + logger.verbose("HTTP headers: [{0}]", headers) + logger.verbose("HTTP proxy: [{0}:{1}]", proxy_host, proxy_port) + + retry_msg = '' + log_msg = "HTTP {0}".format(method) for retry in range(0, max_retry): + retry_interval = RETRY_WAITING_INTERVAL try: - resp = _http_request(method, host, rel_uri, port=port, data=data, - secure=secure, headers=headers, - proxy_host=proxy_host, proxy_port=proxy_port) - logger.verbose("HTTP Resp: Status={0}", resp.status) - logger.verbose(" Header={0}", resp.getheaders()) + resp = _http_request(method, + host, + rel_uri, + port=port, + data=data, + secure=secure, + headers=headers, + proxy_host=proxy_host, + proxy_port=proxy_port) + logger.verbose("HTTP response status: [{0}]", resp.status) return resp except httpclient.HTTPException as e: - logger.warn('HTTPException {0}, args:{1}', e, repr(e.args)) + retry_msg = 'HTTP exception: {0} {1}'.format(log_msg, e) + retry_interval = 5 except IOError as e: - logger.warn('Socket IOError {0}, args:{1}', e, repr(e.args)) - - if retry < max_retry - 1: - logger.info("Retry={0}, {1} {2}", retry, method, url) - time.sleep(RETRY_WAITING_INTERVAL) - - if url is not None and len(url) > 100: - url_log = url[0: 100] #In case the url is too long - else: - url_log = url - raise HttpError("HTTP Err: {0} {1}".format(method, url_log)) + retry_msg = 'IO error: {0} {1}'.format(log_msg, e) + retry_interval = 0 + max_retry = 0 + + if retry < max_retry: + logger.info("Retry [{0}/{1} - {3}]", + retry+1, + max_retry, + retry_interval, + retry_msg) + time.sleep(retry_interval) + + raise HttpError("{0} failed".format(log_msg)) + def http_get(url, headers=None, max_retry=3, chk_proxy=False): - return http_request("GET", url, data=None, headers=headers, + return http_request("GET", url, data=None, headers=headers, max_retry=max_retry, chk_proxy=chk_proxy) + def http_head(url, headers=None, max_retry=3, chk_proxy=False): - return http_request("HEAD", url, None, headers=headers, + return http_request("HEAD", url, None, headers=headers, max_retry=max_retry, chk_proxy=chk_proxy) + def http_post(url, data, headers=None, max_retry=3, chk_proxy=False): - return http_request("POST", url, data, headers=headers, + return http_request("POST", url, data, headers=headers, max_retry=max_retry, chk_proxy=chk_proxy) + def http_put(url, data, headers=None, max_retry=3, chk_proxy=False): - return http_request("PUT", url, data, headers=headers, + return http_request("PUT", url, data, headers=headers, max_retry=max_retry, chk_proxy=chk_proxy) + def http_delete(url, headers=None, max_retry=3, chk_proxy=False): - return http_request("DELETE", url, None, headers=headers, + return http_request("DELETE", url, None, headers=headers, max_retry=max_retry, chk_proxy=chk_proxy) -#End REST api util functions +# End REST api util functions diff --git a/azurelinuxagent/common/utils/textutil.py b/azurelinuxagent/common/utils/textutil.py index f03c7e6..59b8fe7 100644 --- a/azurelinuxagent/common/utils/textutil.py +++ b/azurelinuxagent/common/utils/textutil.py @@ -251,8 +251,14 @@ def set_ini_config(config, name, val): def remove_bom(c): - if str_to_ord(c[0]) > 128 and str_to_ord(c[1]) > 128 and \ - str_to_ord(c[2]) > 128: + ''' + bom is comprised of a sequence of three chars,0xef, 0xbb, 0xbf, in case of utf-8. + ''' + if not is_str_none_or_whitespace(c) and \ + len(c) > 2 and \ + str_to_ord(c[0]) > 128 and \ + str_to_ord(c[1]) > 128 and \ + str_to_ord(c[2]) > 128: c = c[3:] return c @@ -277,3 +283,34 @@ def b64encode(s): if PY_VERSION_MAJOR > 2: return base64.b64encode(bytes(s, 'utf-8')).decode('utf-8') return base64.b64encode(s) + + +def b64decode(s): + from azurelinuxagent.common.version import PY_VERSION_MAJOR + if PY_VERSION_MAJOR > 2: + return base64.b64decode(s).decode('utf-8') + return base64.b64decode(s) + + +def safe_shlex_split(s): + import shlex + from azurelinuxagent.common.version import PY_VERSION + if PY_VERSION[:2] == (2, 6): + return shlex.split(s.encode('utf-8')) + return shlex.split(s) + + +def parse_json(json_str): + """ + Parse json string and return a resulting dictionary + """ + # trim null and whitespaces + result = None + if not is_str_none_or_whitespace(json_str): + import json + result = json.loads(json_str.rstrip(' \t\r\n\0')) + + return result + +def is_str_none_or_whitespace(s): + return s is None or len(s) == 0 or s.isspace() diff --git a/azurelinuxagent/common/version.py b/azurelinuxagent/common/version.py index 6c4b475..1099e25 100644 --- a/azurelinuxagent/common/version.py +++ b/azurelinuxagent/common/version.py @@ -26,23 +26,59 @@ from azurelinuxagent.common.utils.flexible_version import FlexibleVersion from azurelinuxagent.common.future import ustr +""" +Add this workaround for detecting F5 products because BIG-IP/IQ/etc do not show +their version info in the /etc/product-version location. Instead, the version +and product information is contained in the /VERSION file. +""" +def get_f5_platform(): + result = [None,None,None,None] + f5_version = re.compile("^Version: (\d+\.\d+\.\d+)") + f5_product = re.compile("^Product: ([\w-]+)") + + with open('/VERSION', 'r') as fh: + content = fh.readlines() + for line in content: + version_matches = f5_version.match(line) + product_matches = f5_product.match(line) + if version_matches: + result[1] = version_matches.group(1) + elif product_matches: + result[3] = product_matches.group(1) + if result[3] == "BIG-IP": + result[0] = "bigip" + result[2] = "bigip" + elif result[3] == "BIG-IQ": + result[0] = "bigiq" + result[2] = "bigiq" + elif result[3] == "iWorkflow": + result[0] = "iworkflow" + result[2] = "iworkflow" + return result + def get_distro(): if 'FreeBSD' in platform.system(): release = re.sub('\-.*\Z', '', ustr(platform.release())) osinfo = ['freebsd', release, '', 'freebsd'] elif 'linux_distribution' in dir(platform): - osinfo = list(platform.linux_distribution(full_distribution_name=0)) + osinfo = list(platform.linux_distribution(full_distribution_name=0, + supported_dists=platform._supported_dists+('alpine',))) full_name = platform.linux_distribution()[0].strip() osinfo.append(full_name) else: osinfo = platform.dist() # The platform.py lib has issue with detecting oracle linux distribution. - # Merge the following patch provided by oracle as a temparory fix. + # Merge the following patch provided by oracle as a temporary fix. if os.path.exists("/etc/oracle-release"): osinfo[2] = "oracle" osinfo[3] = "Oracle Linux" + # The platform.py lib has issue with detecting BIG-IP linux distribution. + # Merge the following patch provided by F5. + if os.path.exists("/shared/vadc"): + osinfo = get_f5_platform() + # Remove trailing whitespace and quote in distro name osinfo[0] = osinfo[0].strip('"').strip(' ').lower() return osinfo @@ -50,7 +86,7 @@ def get_distro(): AGENT_NAME = "WALinuxAgent" AGENT_LONG_NAME = "Azure Linux Agent" -AGENT_VERSION = '2.1.5' +AGENT_VERSION = '2.2.2' AGENT_LONG_VERSION = "{0}-{1}".format(AGENT_NAME, AGENT_VERSION) AGENT_DESCRIPTION = """\ The Azure Linux Agent supports the provisioning and running of Linux @@ -65,6 +101,8 @@ AGENT_PATTERN = "{0}-(.*)".format(AGENT_NAME) AGENT_NAME_PATTERN = re.compile(AGENT_PATTERN) AGENT_DIR_PATTERN = re.compile(".*/{0}".format(AGENT_PATTERN)) +EXT_HANDLER_PATTERN = b".*/WALinuxAgent-(\w.\w.\w[.\w]*)-.*-run-exthandlers" +EXT_HANDLER_REGEX = re.compile(EXT_HANDLER_PATTERN) # Set the CURRENT_AGENT and CURRENT_VERSION to match the agent directory name # - This ensures the agent will "see itself" using the same name and version @@ -83,6 +121,23 @@ def set_current_agent(): return agent, FlexibleVersion(version) CURRENT_AGENT, CURRENT_VERSION = set_current_agent() +def set_goal_state_agent(): + agent = None + pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] + for pid in pids: + try: + pname = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read() + match = EXT_HANDLER_REGEX.match(pname) + if match: + agent = match.group(1) + break + except IOError: + continue + if agent is None: + agent = CURRENT_VERSION + return agent +GOAL_STATE_AGENT_VERSION = set_goal_state_agent() + def is_current_agent_installed(): return CURRENT_AGENT == AGENT_LONG_VERSION @@ -114,3 +169,4 @@ def is_snappy(): if is_snappy(): DISTRO_FULL_NAME = "Snappy Ubuntu Core" + diff --git a/azurelinuxagent/daemon/resourcedisk/default.py b/azurelinuxagent/daemon/resourcedisk/default.py index d2e400a..18ce884 100644 --- a/azurelinuxagent/daemon/resourcedisk/default.py +++ b/azurelinuxagent/daemon/resourcedisk/default.py @@ -28,23 +28,26 @@ import azurelinuxagent.common.utils.shellutil as shellutil from azurelinuxagent.common.exception import ResourceDiskError from azurelinuxagent.common.osutil import get_osutil -DATALOSS_WARNING_FILE_NAME="DATALOSS_WARNING_README.txt" -DATA_LOSS_WARNING="""\ +DATALOSS_WARNING_FILE_NAME = "DATALOSS_WARNING_README.txt" +DATA_LOSS_WARNING = """\ WARNING: THIS IS A TEMPORARY DISK. Any data stored on this drive is SUBJECT TO LOSS and THERE IS NO WAY TO RECOVER IT. Please do not use this disk for storing any personal or application data. -For additional details to please refer to the MSDN documentation at : http://msdn.microsoft.com/en-us/library/windowsazure/jj672979.aspx +For additional details to please refer to the MSDN documentation at : +http://msdn.microsoft.com/en-us/library/windowsazure/jj672979.aspx """ + class ResourceDiskHandler(object): def __init__(self): self.osutil = get_osutil() + self.fs = conf.get_resourcedisk_filesystem() def start_activate_resource_disk(self): - disk_thread = threading.Thread(target = self.run) + disk_thread = threading.Thread(target=self.run) disk_thread.start() def run(self): @@ -59,18 +62,18 @@ class ResourceDiskHandler(object): logger.info("Activate resource disk") try: mount_point = conf.get_resourcedisk_mountpoint() - fs = conf.get_resourcedisk_filesystem() - mount_point = self.mount_resource_disk(mount_point, fs) - warning_file = os.path.join(mount_point, DATALOSS_WARNING_FILE_NAME) + mount_point = self.mount_resource_disk(mount_point) + warning_file = os.path.join(mount_point, + DATALOSS_WARNING_FILE_NAME) try: fileutil.write_file(warning_file, DATA_LOSS_WARNING) except IOError as e: - logger.warn("Failed to write data loss warnning:{0}", e) + logger.warn("Failed to write data loss warning:{0}", e) return mount_point except ResourceDiskError as e: logger.error("Failed to mount resource disk {0}", e) add_event(name="WALA", is_success=False, message=ustr(e), - op=WALAEventOperation.ActivateResourceDisk) + op=WALAEventOperation.ActivateResourceDisk) def enable_swap(self, mount_point): logger.info("Enable swap") @@ -80,67 +83,134 @@ class ResourceDiskHandler(object): except ResourceDiskError as e: logger.error("Failed to enable swap {0}", e) - def mount_resource_disk(self, mount_point, fs): + def mount_resource_disk(self, mount_point): device = self.osutil.device_for_ide_port(1) if device is None: raise ResourceDiskError("unable to detect disk topology") - device = "/dev/" + device - mountlist = shellutil.run_get_output("mount")[1] - existing = self.osutil.get_mount_point(mountlist, device) + device = "/dev/{0}".format(device) + partition = device + "1" + mount_list = shellutil.run_get_output("mount")[1] + existing = self.osutil.get_mount_point(mount_list, device) - if(existing): - logger.info("Resource disk {0}1 is already mounted", device) + if existing: + logger.info("Resource disk [{0}] is already mounted [{1}]", + partition, + existing) return existing fileutil.mkdir(mount_point, mode=0o755) - - logger.info("Detect GPT...") - partition = device + "1" + logger.info("Examining partition table") ret = shellutil.run_get_output("parted {0} print".format(device)) if ret[0]: - raise ResourceDiskError("({0}) {1}".format(device, ret[1])) + raise ResourceDiskError("Could not determine partition info for " + "{0}: {1}".format(device, ret[1])) + + force_option = 'F' + if self.fs == 'xfs': + force_option = 'f' + mkfs_string = "mkfs.{0} {1} -{2}".format(self.fs, partition, force_option) if "gpt" in ret[1]: - logger.info("GPT detected") - logger.info("Get GPT partitions") - parts = [x for x in ret[1].split("\n") if re.match("^\s*[0-9]+", x)] - logger.info("Found more than {0} GPT partitions.", len(parts)) + logger.info("GPT detected, finding partitions") + parts = [x for x in ret[1].split("\n") if + re.match("^\s*[0-9]+", x)] + logger.info("Found {0} GPT partition(s).", len(parts)) if len(parts) > 1: - logger.info("Remove old GPT partitions") + logger.info("Removing old GPT partitions") for i in range(1, len(parts) + 1): - logger.info("Remove partition: {0}", i) + logger.info("Remove partition {0}", i) shellutil.run("parted {0} rm {1}".format(device, i)) - logger.info("Create a new GPT partition using entire disk space") + logger.info("Creating new GPT partition") shellutil.run("parted {0} mkpart primary 0% 100%".format(device)) - logger.info("Format partition: {0} with fstype {1}",partition,fs) - shellutil.run("mkfs." + fs + " " + partition + " -F") + logger.info("Format partition [{0}]", mkfs_string) + shellutil.run(mkfs_string) else: - logger.info("GPT not detected") - logger.info("Check fstype") - ret = shellutil.run_get_output("sfdisk -q -c {0} 1".format(device)) - if ret[1].rstrip() == "7" and fs != "ntfs": - logger.info("The partition is formatted with ntfs") - logger.info("Format partition: {0} with fstype {1}",partition,fs) - shellutil.run("sfdisk -c {0} 1 83".format(device)) - shellutil.run("mkfs." + fs + " " + partition + " -F") - - logger.info("Mount resource disk") - ret = shellutil.run("mount {0} {1}".format(partition, mount_point), - chk_err=False) + logger.info("GPT not detected, determining filesystem") + ret = self.change_partition_type(suppress_message=True, option_str="{0} 1".format(device)) + ptype = ret[1].strip() + if ptype == "7" and self.fs != "ntfs": + logger.info("The partition is formatted with ntfs, updating " + "partition type to 83") + self.change_partition_type(suppress_message=False, option_str="{0} 1 83".format(device)) + logger.info("Format partition [{0}]", mkfs_string) + shellutil.run(mkfs_string) + else: + logger.info("The partition type is {0}", ptype) + + mount_options = conf.get_resourcedisk_mountoptions() + mount_string = self.get_mount_string(mount_options, + partition, + mount_point) + logger.info("Mount resource disk [{0}]", mount_string) + ret = shellutil.run(mount_string, chk_err=False) if ret: - logger.warn("Failed to mount resource disk. Retry mounting") - shellutil.run("mkfs." + fs + " " + partition + " -F") - ret = shellutil.run("mount {0} {1}".format(partition, mount_point)) + # Some kernels seem to issue an async partition re-read after a + # 'parted' command invocation. This causes mount to fail if the + # partition re-read is not complete by the time mount is + # attempted. Seen in CentOS 7.2. Force a sequential re-read of + # the partition and try mounting. + logger.warn("Failed to mount resource disk. " + "Retry mounting after re-reading partition info.") + if shellutil.run("sfdisk -R {0}".format(device), chk_err=False): + shellutil.run("blockdev --rereadpt {0}".format(device), + chk_err=False) + ret = shellutil.run(mount_string, chk_err=False) if ret: - raise ResourceDiskError("({0}) {1}".format(partition, ret)) - - logger.info("Resource disk ({0}) is mounted at {1} with fstype {2}", - device, mount_point, fs) + logger.warn("Failed to mount resource disk. " + "Attempting to format and retry mount.") + shellutil.run(mkfs_string) + ret = shellutil.run(mount_string) + if ret: + raise ResourceDiskError("Could not mount {0} " + "after syncing partition table: " + "{1}".format(partition, ret)) + + logger.info("Resource disk {0} is mounted at {1} with {2}", + device, + mount_point, + self.fs) return mount_point + def change_partition_type(self, suppress_message, option_str): + """ + use sfdisk to change partition type. + First try with --part-type; if fails, fall back to -c + """ + + command_to_use = '--part-type' + input = "sfdisk {0} {1} {2}".format(command_to_use, '-f' if suppress_message else '', option_str) + err_code, output = shellutil.run_get_output(input, chk_err=False, log_cmd=True) + + # fall back to -c + if err_code != 0: + logger.info("sfdisk with --part-type failed [{0}], retrying with -c", err_code) + command_to_use = '-c' + input = "sfdisk {0} {1} {2}".format(command_to_use, '-f' if suppress_message else '', option_str) + err_code, output = shellutil.run_get_output(input, log_cmd=True) + + if err_code == 0: + logger.info('{0} succeeded', + input) + else: + logger.error('{0} failed [{1}: {2}]', + input, + err_code, + output) + + return err_code, output + + @staticmethod + def get_mount_string(mount_options, partition, mount_point): + if mount_options is not None: + return 'mount -o {0} {1} {2}'.format(mount_options, + partition, + mount_point) + else: + return 'mount {0} {1}'.format(partition, mount_point) + def create_swap_space(self, mount_point, size_mb): size_kb = size_mb * 1024 size = size_kb * 1024 @@ -166,13 +236,17 @@ class ResourceDiskHandler(object): def mkfile(self, filename, nbytes): """ - Create a non-sparse file of that size. Deletes and replaces existing file. + Create a non-sparse file of that size. Deletes and replaces existing + file. - To allow efficient execution, fallocate will be tried first. This includes - ``os.posix_fallocate`` on Python 3.3+ (unix) and the ``fallocate`` command + To allow efficient execution, fallocate will be tried first. This + includes + ``os.posix_fallocate`` on Python 3.3+ (unix) and the ``fallocate`` + command in the popular ``util-linux{,-ng}`` package. - A dd fallback will be tried too. When size < 64M, perform single-pass dd. + A dd fallback will be tried too. When size < 64M, perform + single-pass dd. Otherwise do two-pass dd. """ @@ -185,35 +259,48 @@ class ResourceDiskHandler(object): if os.path.isfile(filename): os.remove(filename) - # os.posix_fallocate - if sys.version_info >= (3, 3): - # Probable errors: - # - OSError: Seen on Cygwin, libc notimpl? - # - AttributeError: What if someone runs this under... - with open(filename, 'w') as f: - try: - os.posix_fallocate(f.fileno(), 0, nbytes) - return 0 - except: - # Not confident with this thing, just keep trying... - pass - - # fallocate command + # If file system is xfs, use dd right away as we have been reported that + # swap enabling fails in xfs fs when disk space is allocated with fallocate + ret = 0 fn_sh = shellutil.quote((filename,)) - ret = shellutil.run(u"fallocate -l {0} {1}".format(nbytes, fn_sh)) - if ret != 127: # 127 = command not found - return ret + if self.fs != 'xfs': + # os.posix_fallocate + if sys.version_info >= (3, 3): + # Probable errors: + # - OSError: Seen on Cygwin, libc notimpl? + # - AttributeError: What if someone runs this under... + with open(filename, 'w') as f: + try: + os.posix_fallocate(f.fileno(), 0, nbytes) + return 0 + except: + # Not confident with this thing, just keep trying... + pass + + # fallocate command + ret = shellutil.run( + u"umask 0077 && fallocate -l {0} {1}".format(nbytes, fn_sh)) + if ret == 0: + return ret + + logger.info("fallocate unsuccessful, falling back to dd") # dd fallback dd_maxbs = 64 * 1024 ** 2 - dd_cmd = "dd if=/dev/zero bs={0} count={1} conv=notrunc of={2}" + dd_cmd = "umask 0077 && dd if=/dev/zero bs={0} count={1} " \ + "conv=notrunc of={2}" blocks = int(nbytes / dd_maxbs) if blocks > 0: - ret = shellutil.run(dd_cmd.format(dd_maxbs, fn_sh, blocks)) << 8 + ret = shellutil.run(dd_cmd.format(dd_maxbs, blocks, fn_sh)) << 8 remains = int(nbytes % dd_maxbs) if remains > 0: - ret += shellutil.run(dd_cmd.format(remains, fn_sh, 1)) + ret += shellutil.run(dd_cmd.format(remains, 1, fn_sh)) + + if ret == 0: + logger.info("dd successful") + else: + logger.error("dd unsuccessful") return ret diff --git a/azurelinuxagent/daemon/resourcedisk/freebsd.py b/azurelinuxagent/daemon/resourcedisk/freebsd.py index 36a3ac9..4ca0058 100644 --- a/azurelinuxagent/daemon/resourcedisk/freebsd.py +++ b/azurelinuxagent/daemon/resourcedisk/freebsd.py @@ -57,19 +57,24 @@ class FreeBSDResourceDiskHandler(ResourceDiskHandler): raise ResourceDiskError("Unable to detect resource disk device:{0}".format(output)) disks = self.parse_gpart_list(output) - err, output = shellutil.run_get_output('camcontrol periphlist 2:1:0') - if err: - raise ResourceDiskError("Unable to detect resource disk device:{0}".format(output)) + device = self.osutil.device_for_ide_port(1) + if device is None: + # fallback logic to find device + err, output = shellutil.run_get_output('camcontrol periphlist 2:1:0') + if err: + # try again on "3:1:0" + err, output = shellutil.run_get_output('camcontrol periphlist 3:1:0') + if err: + raise ResourceDiskError("Unable to detect resource disk device:{0}".format(output)) # 'da1: generation: 4 index: 1 status: MORE\npass2: generation: 4 index: 2 status: LAST\n' - device = None - for line in output.split('\n'): - index = line.find(':') - if index > 0: - geom_name = line[:index] - if geom_name in disks: - device = geom_name - break + for line in output.split('\n'): + index = line.find(':') + if index > 0: + geom_name = line[:index] + if geom_name in disks: + device = geom_name + break if not device: raise ResourceDiskError("Unable to detect resource disk device.") diff --git a/azurelinuxagent/ga/env.py b/azurelinuxagent/ga/env.py index 2d67d4b..5d8da5c 100644 --- a/azurelinuxagent/ga/env.py +++ b/azurelinuxagent/ga/env.py @@ -45,7 +45,8 @@ class EnvHandler(object): self.stopped = True self.hostname = None self.dhcpid = None - self.server_thread=None + self.server_thread = None + self.dhcp_warning_enabled = True def run(self): if not self.stopped: @@ -87,8 +88,11 @@ class EnvHandler(object): def handle_dhclient_restart(self): if self.dhcpid is None: - logger.warn("Dhcp client is not running. ") + if self.dhcp_warning_enabled: + logger.warn("Dhcp client is not running. ") self.dhcpid = self.osutil.get_dhcp_pid() + # disable subsequent error logging + self.dhcp_warning_enabled = self.dhcpid is not None return #The dhcp process hasn't changed since last check diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py index d3c8f32..c9e6b5f 100644 --- a/azurelinuxagent/ga/exthandlers.py +++ b/azurelinuxagent/ga/exthandlers.py @@ -21,6 +21,7 @@ import glob import json import os import shutil +import stat import subprocess import time import zipfile @@ -108,6 +109,45 @@ def parse_ext_status(ext_status, data): for substatus in substatus_list: ext_status.substatusList.append(parse_ext_substatus(substatus)) +# This code migrates, if it exists, handler state and status from an +# agent-owned directory into the handler-owned config directory +# +# Notes: +# - The v2.0.x branch wrote all handler-related state into the handler-owned +# config directory (e.g., /var/lib/waagent/Microsoft.Azure.Extensions.LinuxAsm-2.0.1/config). +# - The v2.1.x branch original moved that state into an agent-owned handler +# state directory (e.g., /var/lib/waagent/handler_state). +# - This move can cause v2.1.x agents to multiply invoke a handler's install +# command. It also makes clean-up more difficult since the agent must +# remove the state as well as the handler directory. +def migrate_handler_state(): + handler_state_path = os.path.join(conf.get_lib_dir(), "handler_state") + if not os.path.isdir(handler_state_path): + return + + for handler_path in glob.iglob(os.path.join(handler_state_path, "*")): + handler = os.path.basename(handler_path) + handler_config_path = os.path.join(conf.get_lib_dir(), handler, "config") + if os.path.isdir(handler_config_path): + for file in ("State", "Status"): + from_path = os.path.join(handler_state_path, handler, file.lower()) + to_path = os.path.join(handler_config_path, "Handler" + file) + if os.path.isfile(from_path) and not os.path.isfile(to_path): + try: + shutil.move(from_path, to_path) + except Exception as e: + logger.warn( + "Exception occurred migrating {0} {1} file: {2}", + handler, + file, + str(e)) + + try: + shutil.rmtree(handler_state_path) + except Exception as e: + logger.warn("Exception occurred removing {0}: {1}", handler_state_path, str(e)) + return + class ExtHandlerState(object): NotInstalled = "NotInstalled" Installed = "Installed" @@ -122,6 +162,7 @@ class ExtHandlersHandler(object): self.ext_handlers = None self.last_etag = None self.log_report = False + self.log_etag = True def run(self): self.ext_handlers, etag = None, None @@ -135,16 +176,12 @@ class ExtHandlersHandler(object): add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=msg) return - if self.last_etag is not None and self.last_etag == etag: - msg = u"Incarnation {0} has no extension updates".format(etag) - logger.verbose(msg) - self.log_report = False - else: - msg = u"Handle extensions updates for incarnation {0}".format(etag) - logger.info(msg) - self.log_report = True #Log status report success on new config - self.handle_ext_handlers() - self.last_etag = etag + msg = u"Handle extensions updates for incarnation {0}".format(etag) + logger.verbose(msg) + # Log status report success on new config + self.log_report = True + self.handle_ext_handlers(etag) + self.last_etag = etag self.report_ext_handlers_status() @@ -152,18 +189,36 @@ class ExtHandlersHandler(object): self.report_ext_handlers_status() return - def handle_ext_handlers(self): + def handle_ext_handlers(self, etag=None): if self.ext_handlers.extHandlers is None or \ len(self.ext_handlers.extHandlers) == 0: - logger.info("No ext handler config found") + logger.verbose("No extension handler config found") return + if conf.get_enable_overprovisioning(): + artifacts_profile = self.protocol.get_artifacts_profile() + if artifacts_profile and artifacts_profile.is_on_hold(): + logger.info("Extension handling is on hold") + return + for ext_handler in self.ext_handlers.extHandlers: - #TODO handle install in sequence, enable in parallel - self.handle_ext_handler(ext_handler) + # TODO: handle install in sequence, enable in parallel + self.handle_ext_handler(ext_handler, etag) - def handle_ext_handler(self, ext_handler): + def handle_ext_handler(self, ext_handler, etag): ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) + + ext_handler_i.decide_version() + if not ext_handler_i.is_upgrade and self.last_etag == etag: + if self.log_etag: + ext_handler_i.logger.verbose("Version {0} is current for etag {1}", + ext_handler_i.pkg.version, + etag) + self.log_etag = False + return + + self.log_etag = True + try: state = ext_handler.properties.state ext_handler_i.logger.info("Expected handler state: {0}", state) @@ -182,12 +237,10 @@ class ExtHandlersHandler(object): def handle_enable(self, ext_handler_i): - ext_handler_i.decide_version() - old_ext_handler_i = ext_handler_i.get_installed_ext_handler() if old_ext_handler_i is not None and \ old_ext_handler_i.version_gt(ext_handler_i): - raise ExtensionError(u"Downgrade not allowed") + raise ExtensionError(u"Downgrade not allowed") handler_state = ext_handler_i.get_handler_state() ext_handler_i.logger.info("Current handler state is: {0}", handler_state) @@ -246,17 +299,14 @@ class ExtHandlersHandler(object): message=ustr(e)) logger.verbose("Report vm agent status") - try: self.protocol.report_vm_status(vm_status) + if self.log_report: + logger.verbose("Successfully reported vm agent status") except ProtocolError as e: message = "Failed to report vm agent status: {0}".format(e) add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message) - if self.log_report: - logger.verbose("Successfully reported vm agent status") - - def report_ext_handler_status(self, vm_status, ext_handler): ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) @@ -287,6 +337,7 @@ class ExtHandlerInstance(object): self.protocol = protocol self.operation = None self.pkg = None + self.is_upgrade = False prefix = "[{0}]".format(self.get_full_name()) self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix) @@ -301,7 +352,7 @@ class ExtHandlerInstance(object): logger.LogLevel.INFO, log_file) def decide_version(self): - self.logger.info("Decide which version to use") + self.logger.verbose("Decide which version to use") try: pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler) except ProtocolError as e: @@ -309,9 +360,10 @@ class ExtHandlerInstance(object): # Determine the desired and installed versions requested_version = FlexibleVersion(self.ext_handler.properties.version) - installed_version = FlexibleVersion(self.get_installed_version()) - if installed_version is None: - installed_version = requested_version + installed_version_string = self.get_installed_version() + installed_version = requested_version \ + if installed_version_string is None \ + else FlexibleVersion(installed_version_string) # Divide packages # - Find the installed package (its version must exactly match) @@ -392,10 +444,15 @@ class ExtHandlerInstance(object): self.pkg = selected_pkg self.ext_handler.properties.version = selected_pkg.version + # Note if the selected package is greater than that installed + if installed_pkg is None \ + or FlexibleVersion(self.pkg.version) > FlexibleVersion(installed_pkg.version): + self.is_upgrade = True + if self.pkg is None: raise ExtensionError("Failed to find any valid extension package") - self.logger.info("Use version: {0}", self.pkg.version) + self.logger.verbose("Use version: {0}", self.pkg.version) return def version_gt(self, other): @@ -464,8 +521,10 @@ class ExtHandlerInstance(object): for uri in self.pkg.uris: try: package = self.protocol.download_ext_handler_pkg(uri.uri) - except ProtocolError as e: - logger.warn("Failed download extension: {0}", e) + if package is not None: + break + except Exception as e: + logger.warn("Error while downloading extension: {0}", e) if package is None: raise ExtensionError("Failed to download extension") @@ -479,8 +538,10 @@ class ExtHandlerInstance(object): except IOError as e: raise ExtensionError(u"Failed to write and unzip plugin", e) - chmod = "find {0} -type f | xargs chmod u+x".format(self.get_base_dir()) - shellutil.run(chmod) + #Add user execute permission to all files under the base dir + for file in fileutil.get_all_files(self.get_base_dir()): + fileutil.chmod(file, os.stat(file).st_mode | stat.S_IXUSR) + self.report_event(message="Download succeeded") self.logger.info("Initialize extension directory") @@ -547,10 +608,6 @@ class ExtHandlerInstance(object): def rm_ext_handler_dir(self): try: - handler_state_dir = self.get_handler_state_dir() - if os.path.isdir(handler_state_dir): - self.logger.info("Remove ext handler dir: {0}", handler_state_dir) - shutil.rmtree(handler_state_dir) base_dir = self.get_base_dir() if os.path.isdir(base_dir): self.logger.info("Remove ext handler dir: {0}", base_dir) @@ -746,28 +803,18 @@ class ExtHandlerInstance(object): fileutil.write_file(self.get_env_file(), json.dumps(env)) except IOError as e: raise ExtensionError(u"Failed to save handler environment", e) - - def get_handler_state_dir(self): - return os.path.join(conf.get_lib_dir(), "handler_state", - self.get_full_name()) def set_handler_state(self, handler_state): - state_dir = self.get_handler_state_dir() - if not os.path.exists(state_dir): - try: - fileutil.mkdir(state_dir, 0o700) - except IOError as e: - self.logger.error("Failed to create state dir: {0}", e) - + state_dir = self.get_conf_dir() try: - state_file = os.path.join(state_dir, "state") + state_file = os.path.join(state_dir, "HandlerState") fileutil.write_file(state_file, handler_state) except IOError as e: self.logger.error("Failed to set state: {0}", e) def get_handler_state(self): - state_dir = self.get_handler_state_dir() - state_file = os.path.join(state_dir, "state") + state_dir = self.get_conf_dir() + state_file = os.path.join(state_dir, "HandlerState") if not os.path.isfile(state_file): return ExtHandlerState.NotInstalled @@ -777,32 +824,25 @@ class ExtHandlerInstance(object): self.logger.error("Failed to get state: {0}", e) return ExtHandlerState.NotInstalled - def set_handler_status(self, status="NotReady", message="", - code=0): - state_dir = self.get_handler_state_dir() - if not os.path.exists(state_dir): - try: - fileutil.mkdir(state_dir, 0o700) - except IOError as e: - self.logger.error("Failed to create state dir: {0}", e) - + def set_handler_status(self, status="NotReady", message="", code=0): + state_dir = self.get_conf_dir() + handler_status = ExtHandlerStatus() handler_status.name = self.ext_handler.name handler_status.version = self.ext_handler.properties.version handler_status.message = message handler_status.code = code handler_status.status = status - status_file = os.path.join(state_dir, "status") + status_file = os.path.join(state_dir, "HandlerStatus") try: - fileutil.write_file(status_file, - json.dumps(get_properties(handler_status))) + fileutil.write_file(status_file, json.dumps(get_properties(handler_status))) except (IOError, ValueError, ProtocolError) as e: self.logger.error("Failed to save handler status: {0}", e) def get_handler_status(self): - state_dir = self.get_handler_state_dir() - status_file = os.path.join(state_dir, "status") + state_dir = self.get_conf_dir() + status_file = os.path.join(state_dir, "HandlerStatus") if not os.path.isfile(status_file): return None @@ -899,4 +939,4 @@ class HandlerManifest(object): update_mode = self.data['handlerManifest'].get('updateMode') if update_mode is None: return True - return update_mode.low() == "updatewithinstall" + return update_mode.lower() == "updatewithinstall" diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py index f49cef8..478a7a3 100644 --- a/azurelinuxagent/ga/monitor.py +++ b/azurelinuxagent/ga/monitor.py @@ -183,8 +183,11 @@ class MonitorHandler(object): while True: if (datetime.datetime.now() - last_heartbeat) > period: last_heartbeat = datetime.datetime.now() - add_event(op=WALAEventOperation.HeartBeat, name="WALA", - is_success=True) + add_event( + op=WALAEventOperation.HeartBeat, + name=CURRENT_AGENT, + version=CURRENT_VERSION, + is_success=True) try: self.collect_and_send_events() except Exception as e: diff --git a/azurelinuxagent/ga/update.py b/azurelinuxagent/ga/update.py index e89608a..996484b 100644 --- a/azurelinuxagent/ga/update.py +++ b/azurelinuxagent/ga/update.py @@ -16,12 +16,12 @@ # # Requires Python 2.4+ and Openssl 1.0+ # + import glob import json import os import platform import re -import shlex import shutil import signal import subprocess @@ -59,10 +59,14 @@ CHILD_LAUNCH_RESTART_MAX = 3 CHILD_POLL_INTERVAL = 60 MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted +RETAIN_INTERVAL = 24 * 60 * 60 # Retain interval for black list GOAL_STATE_INTERVAL = 25 REPORT_STATUS_INTERVAL = 15 -RETAIN_INTERVAL = 24 * 60 * 60 # Retain interval for black list + +ORPHAN_WAIT_INTERVAL = 15 * 60 * 60 + +AGENT_SENTINAL_FILE = "current_version" def get_update_handler(): @@ -81,7 +85,6 @@ class UpdateHandler(object): self.protocol_util = get_protocol_util() self.running = True - self.last_etag = None self.last_attempt_time = None self.agents = [] @@ -126,7 +129,7 @@ class UpdateHandler(object): try: # Launch the correct Python version for python-based agents - cmds = shlex.split(agent_cmd) + cmds = textutil.safe_shlex_split(agent_cmd) if cmds[0].lower() == "python": cmds[0] = get_python_cmd() agent_cmd = " ".join(cmds) @@ -218,13 +221,21 @@ class UpdateHandler(object): from azurelinuxagent.ga.env import get_env_handler get_env_handler().run() - from azurelinuxagent.ga.exthandlers import get_exthandlers_handler + from azurelinuxagent.ga.exthandlers import get_exthandlers_handler, migrate_handler_state exthandlers_handler = get_exthandlers_handler() + migrate_handler_state() - # TODO: Add means to stop running try: + self._ensure_no_orphans() + self._emit_restart_event() + + # TODO: Add means to stop running while self.running: - if self._ensure_latest_agent(): + if self._is_orphaned: + logger.info("Goal state agent {0} was orphaned -- exiting", CURRENT_AGENT) + break + + if self._upgrade_available(): if len(self.agents) > 0: logger.info( u"Agent {0} discovered {1} as an update and will exit", @@ -234,16 +245,26 @@ class UpdateHandler(object): exthandlers_handler.run() - time.sleep(25) + time.sleep(GOAL_STATE_INTERVAL) except Exception as e: logger.warn(u"Agent {0} failed with exception: {1}", CURRENT_AGENT, ustr(e)) sys.exit(1) + return + self._shutdown() sys.exit(0) return def forward_signal(self, signum, frame): + # Note: + # - At present, the handler is registered only for SIGTERM. + # However, clean shutdown is both SIGTERM and SIGKILL. + # A SIGKILL handler is not being registered at this time to + # minimize perturbing the code. + if signum in (signal.SIGTERM, signal.SIGKILL): + self._shutdown() + if self.child_process is None: return @@ -258,13 +279,14 @@ class UpdateHandler(object): self.signal_handler(signum, frame) elif self.signal_handler is signal.SIG_DFL: if signum == signal.SIGTERM: + # TODO: This should set self.running to False vs. just exiting sys.exit(0) return def get_latest_agent(self): """ If autoupdate is enabled, return the most current, downloaded, - non-blacklisted agent (if any). + non-blacklisted agent which is not the current version (if any). Otherwise, return None (implying to use the installed agent). """ @@ -272,10 +294,27 @@ class UpdateHandler(object): return None self._load_agents() - available_agents = [agent for agent in self.agents if agent.is_available] + available_agents = [agent for agent in self.agents + if agent.is_available + and agent.version > FlexibleVersion(AGENT_VERSION)] + return available_agents[0] if len(available_agents) >= 1 else None - def _ensure_latest_agent(self, base_version=CURRENT_VERSION): + def _emit_restart_event(self): + if not self._is_clean_start: + msg = u"{0} did not terminate cleanly".format(CURRENT_AGENT) + logger.info(msg) + add_event( + AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.Restart, + is_success=False, + message=msg) + + self._set_sentinal() + return + + def _upgrade_available(self, base_version=CURRENT_VERSION): # Ignore new agents if updating is disabled if not conf.get_autoupdate_enabled(): return False @@ -306,11 +345,8 @@ class UpdateHandler(object): message=msg) return False - if self.last_etag is not None and self.last_etag == etag: - logger.info(u"Incarnation {0} has no agent updates", etag) - return False - - manifests = [m for m in manifest_list.vmAgentManifests if m.family == family] + manifests = [m for m in manifest_list.vmAgentManifests \ + if m.family == family and len(m.versionsManifestUris) > 0] if len(manifests) == 0: logger.info(u"Incarnation {0} has no agent family {1} updates", etag, family) return False @@ -318,7 +354,8 @@ class UpdateHandler(object): try: pkg_list = protocol.get_vmagent_pkgs(manifests[0]) except ProtocolError as e: - msg= u"Incarnation {0} failed to get {1} package list: {2}".format( + msg = u"Incarnation {0} failed to get {1} package list: " \ + u"{2}".format( etag, family, ustr(e)) @@ -331,18 +368,51 @@ class UpdateHandler(object): message=msg) return False - # Set the agents to those available for download at least as current as the existing agent - # and remove from disk any agent no longer reported to the VM. + # Set the agents to those available for download at least as current + # as the existing agent and remove from disk any agent no longer + # reported to the VM. # Note: - # The code leaves on disk available, but blacklisted, agents so as to preserve the state. - # Otherwise, those agents could be again downloaded and inappropriately retried. - self._set_agents([GuestAgent(pkg=pkg) for pkg in pkg_list.versions]) + # The code leaves on disk available, but blacklisted, agents so as to + # preserve the state. Otherwise, those agents could be again + # downloaded and inappropriately retried. + host = None + if protocol and protocol.client: + host = protocol.client.get_host_plugin() + self._set_agents([GuestAgent(pkg=pkg, host=host) for pkg in pkg_list.versions]) self._purge_agents() self._filter_blacklisted_agents() # Return True if agents more recent than the current are available return len(self.agents) > 0 and self.agents[0].version > base_version + def _ensure_no_orphans(self, orphan_wait_interval=ORPHAN_WAIT_INTERVAL): + previous_pid_file, pid_file = self._write_pid_file() + if previous_pid_file is not None: + try: + pid = fileutil.read_file(previous_pid_file) + wait_interval = orphan_wait_interval + while self.osutil.check_pid_alive(pid): + wait_interval -= GOAL_STATE_INTERVAL + if wait_interval <= 0: + logger.warn( + u"{0} forcibly terminated orphan process {1}", + CURRENT_AGENT, + pid) + os.kill(pid, signal.SIGKILL) + break + + logger.info( + u"{0} waiting for orphan process {1} to terminate", + CURRENT_AGENT, + pid) + time.sleep(GOAL_STATE_INTERVAL) + + except Exception as e: + logger.warn( + u"Exception occurred waiting for orphan agent to terminate: {0}", + ustr(e)) + return + def _evaluate_agent_health(self, latest_agent): """ Evaluate the health of the selected agent: If it is restarting @@ -375,18 +445,61 @@ class UpdateHandler(object): self.agents = [agent for agent in self.agents if not agent.is_blacklisted] return + def _get_pid_files(self): + pid_file = conf.get_agent_pid_file_path() + + pid_dir = os.path.dirname(pid_file) + pid_name = os.path.basename(pid_file) + + pid_re = re.compile("(\d+)_{0}".format(re.escape(pid_name))) + pid_files = [int(pid_re.match(f).group(1)) for f in os.listdir(pid_dir) if pid_re.match(f)] + pid_files.sort() + + pid_index = -1 if len(pid_files) <= 0 else pid_files[-1] + previous_pid_file = None \ + if pid_index < 0 \ + else os.path.join(pid_dir, "{0}_{1}".format(pid_index, pid_name)) + pid_file = os.path.join(pid_dir, "{0}_{1}".format(pid_index+1, pid_name)) + return previous_pid_file, pid_file + + @property + def _is_clean_start(self): + if not os.path.isfile(self._sentinal_file_path()): + return True + + try: + if fileutil.read_file(self._sentinal_file_path()) != CURRENT_AGENT: + return True + except Exception as e: + logger.warn( + u"Exception reading sentinal file {0}: {1}", + self._sentinal_file_path(), + str(e)) + + return False + + @property + def _is_orphaned(self): + parent_pid = os.getppid() + if parent_pid in (1, None): + return True + + if not os.path.isfile(conf.get_agent_pid_file_path()): + return True + + return fileutil.read_file(conf.get_agent_pid_file_path()) != ustr(parent_pid) + def _load_agents(self): """ Load all non-blacklisted agents currently on disk. """ - if len(self.agents) <= 0: - try: - path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) - self._set_agents([GuestAgent(path=agent_dir) - for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)]) - self._filter_blacklisted_agents() - except Exception as e: - logger.warn(u"Exception occurred loading available agents: {0}", ustr(e)) + try: + path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) + self._set_agents([GuestAgent(path=agent_dir) + for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)]) + self._filter_blacklisted_agents() + except Exception as e: + logger.warn(u"Exception occurred loading available agents: {0}", ustr(e)) return def _purge_agents(self): @@ -423,10 +536,51 @@ class UpdateHandler(object): self.agents.sort(key=lambda agent: agent.version, reverse=True) return + def _set_sentinal(self, agent=CURRENT_AGENT): + try: + fileutil.write_file(self._sentinal_file_path(), agent) + except Exception as e: + logger.warn( + u"Exception writing sentinal file {0}: {1}", + self._sentinal_file_path(), + str(e)) + return + + def _sentinal_file_path(self): + return os.path.join(conf.get_lib_dir(), AGENT_SENTINAL_FILE) + + def _shutdown(self): + if not os.path.isfile(self._sentinal_file_path()): + return + + try: + os.remove(self._sentinal_file_path()) + except Exception as e: + logger.warn( + u"Exception removing sentinal file {0}: {1}", + self._sentinal_file_path(), + str(e)) + return + + def _write_pid_file(self): + previous_pid_file, pid_file = self._get_pid_files() + try: + fileutil.write_file(pid_file, ustr(os.getpid())) + logger.info(u"{0} running as process {1}", CURRENT_AGENT, ustr(os.getpid())) + except Exception as e: + pid_file = None + logger.warn( + u"Expection writing goal state agent {0} pid to {1}: {2}", + CURRENT_AGENT, + pid_file, + ustr(e)) + return previous_pid_file, pid_file + class GuestAgent(object): - def __init__(self, path=None, pkg=None): + def __init__(self, path=None, pkg=None, host=None): self.pkg = pkg + self.host = host version = None if path is not None: m = AGENT_DIR_PATTERN.match(path) @@ -543,18 +697,15 @@ class GuestAgent(object): return def _download(self): - package = None - for uri in self.pkg.uris: - try: - resp = restutil.http_get(uri.uri, chk_proxy=True) - if resp.status == restutil.httpclient.OK: - package = resp.read() - fileutil.write_file(self.get_agent_pkg_path(), bytearray(package), asbin=True) - logger.info(u"Agent {0} downloaded from {1}", self.name, uri.uri) - break - except restutil.HttpError as e: - logger.warn(u"Agent {0} download from {1} failed", self.name, uri.uri) + if self._fetch(uri.uri): + break + else: + if self.host is not None: + logger.info("Download unsuccessful, falling back to host plugin") + uri, headers = self.host.get_artifact_request(uri.uri, self.host.manifest_uri) + if self._fetch(uri, headers=headers): + break if not os.path.isfile(self.get_agent_pkg_path()): msg = u"Unable to download Agent {0} from any URI".format(self.name) @@ -567,6 +718,23 @@ class GuestAgent(object): raise UpdateError(msg) return + def _fetch(self, uri, headers=None): + package = None + try: + resp = restutil.http_get(uri, chk_proxy=True, headers=headers) + if resp.status == restutil.httpclient.OK: + package = resp.read() + fileutil.write_file(self.get_agent_pkg_path(), + bytearray(package), + asbin=True) + logger.info(u"Agent {0} downloaded from {1}", self.name, uri) + except restutil.HttpError as http_error: + logger.verbose(u"Agent {0} download from {1} failed [{2}]", + self.name, + uri, + http_error) + return package is not None + def _load_error(self): try: if self.error is None: diff --git a/azurelinuxagent/pa/deprovision/clearlinux.py b/azurelinuxagent/pa/deprovision/clearlinux.py new file mode 100644 index 0000000..097891b --- /dev/null +++ b/azurelinuxagent/pa/deprovision/clearlinux.py @@ -0,0 +1,31 @@ +# Microsoft Azure Linux Agent +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import azurelinuxagent.common.utils.fileutil as fileutil +from azurelinuxagent.pa.deprovision.default import DeprovisionHandler, \ + DeprovisionAction + +class ClearLinuxDeprovisionHandler(DeprovisionHandler): + def __init__(self, distro): + self.distro = distro + + def setup(self, deluser): + warnings, actions = super(ClearLinuxDeprovisionHandler, self).setup(deluser) + # Probably should just wipe /etc and /var here + return warnings, actions diff --git a/azurelinuxagent/pa/deprovision/default.py b/azurelinuxagent/pa/deprovision/default.py index b570c31..a702d3f 100644 --- a/azurelinuxagent/pa/deprovision/default.py +++ b/azurelinuxagent/pa/deprovision/default.py @@ -17,6 +17,8 @@ # Requires Python 2.4+ and Openssl 1.0+ # +import signal +import sys import azurelinuxagent.common.conf as conf from azurelinuxagent.common.exception import ProtocolError from azurelinuxagent.common.future import read_input @@ -38,6 +40,7 @@ class DeprovisionHandler(object): def __init__(self): self.osutil = get_osutil() self.protocol_util = get_protocol_util() + signal.signal(signal.SIGINT, self.handle_interrupt_signal) def del_root_password(self, warnings, actions): warnings.append("WARNING! root password will be disabled. " @@ -63,8 +66,8 @@ class DeprovisionHandler(object): def regen_ssh_host_key(self, warnings, actions): warnings.append("WARNING! All SSH host key pairs will be deleted.") - actions.append(DeprovisionAction(shellutil.run, - ['rm -f /etc/ssh/ssh_host_*key*'])) + actions.append(DeprovisionAction(fileutil.rm_files, + ['/etc/ssh/ssh_host_*key*'])) def stop_agent_service(self, warnings, actions): warnings.append("WARNING! The waagent service will be stopped.") @@ -74,6 +77,11 @@ class DeprovisionHandler(object): files_to_del = ['/root/.bash_history', '/var/log/waagent.log'] actions.append(DeprovisionAction(fileutil.rm_files, files_to_del)) + def del_resolv(self, warnings, actions): + warnings.append("WARNING! /etc/resolv.conf will be deleted.") + files_to_del = ["/etc/resolv.conf"] + actions.append(DeprovisionAction(fileutil.rm_files, files_to_del)) + def del_dhcp_lease(self, warnings, actions): warnings.append("WARNING! Cached DHCP leases will be deleted.") dirs_to_del = ["/var/lib/dhclient", "/var/lib/dhcpcd", "/var/lib/dhcp"] @@ -109,6 +117,7 @@ class DeprovisionHandler(object): self.del_lib_dir(warnings, actions) self.del_files(warnings, actions) + self.del_resolv(warnings, actions) if deluser: self.del_user(warnings, actions) @@ -128,4 +137,8 @@ class DeprovisionHandler(object): for action in actions: action.invoke() + def handle_interrupt_signal(self, frame): + print("Deprovision is interrupted.") + sys.exit(0) + diff --git a/azurelinuxagent/pa/deprovision/factory.py b/azurelinuxagent/pa/deprovision/factory.py index dd01633..72a5be1 100644 --- a/azurelinuxagent/pa/deprovision/factory.py +++ b/azurelinuxagent/pa/deprovision/factory.py @@ -21,6 +21,7 @@ from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \ DISTRO_FULL_NAME from .default import DeprovisionHandler +from .clearlinux import ClearLinuxDeprovisionHandler from .coreos import CoreOSDeprovisionHandler from .ubuntu import UbuntuDeprovisionHandler @@ -31,6 +32,8 @@ def get_deprovision_handler(distro_name=DISTRO_NAME, return UbuntuDeprovisionHandler() if distro_name == "coreos": return CoreOSDeprovisionHandler() + if distro_name == "clear linux": + return ClearLinuxDeprovisionHandler() return DeprovisionHandler() diff --git a/azurelinuxagent/pa/deprovision/ubuntu.py b/azurelinuxagent/pa/deprovision/ubuntu.py index 14f90de..b45d415 100644 --- a/azurelinuxagent/pa/deprovision/ubuntu.py +++ b/azurelinuxagent/pa/deprovision/ubuntu.py @@ -18,30 +18,25 @@ # import os -import azurelinuxagent.common.logger as logger import azurelinuxagent.common.utils.fileutil as fileutil from azurelinuxagent.pa.deprovision.default import DeprovisionHandler, \ - DeprovisionAction - -def del_resolv(): - if os.path.realpath('/etc/resolv.conf') != '/run/resolvconf/resolv.conf': - logger.info("resolvconf is not configured. Removing /etc/resolv.conf") - fileutil.rm_files('/etc/resolv.conf') - else: - logger.info("resolvconf is enabled; leaving /etc/resolv.conf intact") - fileutil.rm_files('/etc/resolvconf/resolv.conf.d/tail', - '/etc/resolvconf/resolv.conf.d/originial') + DeprovisionAction class UbuntuDeprovisionHandler(DeprovisionHandler): def __init__(self): super(UbuntuDeprovisionHandler, self).__init__() - def setup(self, deluser): - warnings, actions = super(UbuntuDeprovisionHandler, self).setup(deluser) - warnings.append("WARNING! Nameserver configuration in " - "/etc/resolvconf/resolv.conf.d/{tail,originial} " - "will be deleted.") - actions.append(DeprovisionAction(del_resolv)) - return warnings, actions - + def del_resolv(self, warnings, actions): + if os.path.realpath( + '/etc/resolv.conf') != '/run/resolvconf/resolv.conf': + warnings.append("WARNING! /etc/resolv.conf will be deleted.") + files_to_del = ["/etc/resolv.conf"] + actions.append(DeprovisionAction(fileutil.rm_files, files_to_del)) + else: + warnings.append("WARNING! /etc/resolvconf/resolv.conf.d/tail " + "and /etc/resolvconf/resolv.conf.d/original will " + "be deleted.") + files_to_del = ["/etc/resolvconf/resolv.conf.d/tail", + "/etc/resolvconf/resolv.conf.d/original"] + actions.append(DeprovisionAction(fileutil.rm_files, files_to_del)) diff --git a/azurelinuxagent/pa/provision/default.py b/azurelinuxagent/pa/provision/default.py index b07c147..e851036 100644 --- a/azurelinuxagent/pa/provision/default.py +++ b/azurelinuxagent/pa/provision/default.py @@ -25,69 +25,71 @@ from azurelinuxagent.common.future import ustr import azurelinuxagent.common.conf as conf from azurelinuxagent.common.event import add_event, WALAEventOperation from azurelinuxagent.common.exception import ProvisionError, ProtocolError, \ - OSUtilError + OSUtilError from azurelinuxagent.common.protocol.restapi import ProvisionStatus import azurelinuxagent.common.utils.shellutil as shellutil import azurelinuxagent.common.utils.fileutil as fileutil from azurelinuxagent.common.osutil import get_osutil from azurelinuxagent.common.protocol import get_protocol_util -CUSTOM_DATA_FILE="CustomData" +CUSTOM_DATA_FILE = "CustomData" -class ProvisionHandler(object): +class ProvisionHandler(object): def __init__(self): self.osutil = get_osutil() self.protocol_util = get_protocol_util() def run(self): - #If provision is not enabled, return - if not conf.get_provision_enabled(): - logger.info("Provisioning is disabled. Skip.") - return - + # if provisioning is already done, return provisioned = os.path.join(conf.get_lib_dir(), "provisioned") if os.path.isfile(provisioned): + logger.info("Provisioning already completed, skipping.") return - logger.info("Run provision handler.") - logger.info("Copy ovf-env.xml.") - try: - ovfenv = self.protocol_util.copy_ovf_env() - except ProtocolError as e: - self.report_event("Failed to copy ovf-env.xml: {0}".format(e)) - return - - self.protocol_util.get_protocol_by_file() - - self.report_not_ready("Provisioning", "Starting") - - try: - logger.info("Start provisioning") - self.provision(ovfenv) - fileutil.write_file(provisioned, "") - thumbprint = self.reg_ssh_host_key() - logger.info("Finished provisioning") - except ProvisionError as e: - logger.error("Provision failed: {0}", e) - self.report_not_ready("ProvisioningFailed", ustr(e)) - self.report_event(ustr(e)) - return - + thumbprint = None + # If provision is not enabled, report ready and then return + if not conf.get_provision_enabled(): + logger.info("Provisioning is disabled, skipping.") + else: + logger.info("Running provisioning handler") + try: + logger.info("Copying ovf-env.xml") + ovf_env = self.protocol_util.copy_ovf_env() + self.protocol_util.get_protocol_by_file() + self.report_not_ready("Provisioning", "Starting") + logger.info("Starting provisioning") + self.provision(ovf_env) + thumbprint = self.reg_ssh_host_key() + self.osutil.restart_ssh_service() + self.report_event("Provision succeed", is_success=True) + except ProtocolError as e: + logger.error("[ProtocolError] Provisioning failed: {0}", e) + self.report_not_ready("ProvisioningFailed", ustr(e)) + self.report_event("Failed to copy ovf-env.xml: {0}".format(e)) + return + except ProvisionError as e: + logger.error("[ProvisionError] Provisioning failed: {0}", e) + self.report_not_ready("ProvisioningFailed", ustr(e)) + self.report_event(ustr(e)) + return + # write out provisioned file and report Ready + fileutil.write_file(provisioned, "") self.report_ready(thumbprint) - self.report_event("Provision succeed", is_success=True) - + logger.info("Provisioning complete") + def reg_ssh_host_key(self): keypair_type = conf.get_ssh_host_keypair_type() if conf.get_regenerate_ssh_host_key(): - shellutil.run("rm -f /etc/ssh/ssh_host_*key*") - shellutil.run(("ssh-keygen -N '' -t {0} -f /etc/ssh/ssh_host_{1}_key" - "").format(keypair_type, keypair_type)) + fileutil.rm_files("/etc/ssh/ssh_host_*key*") + keygen_cmd = "ssh-keygen -N '' -t {0} -f /etc/ssh/ssh_host_{1}_key" + shellutil.run(keygen_cmd.format(keypair_type, keypair_type)) thumbprint = self.get_ssh_host_key_thumbprint(keypair_type) return thumbprint def get_ssh_host_key_thumbprint(self, keypair_type): - cmd = "ssh-keygen -lf /etc/ssh/ssh_host_{0}_key.pub".format(keypair_type) + cmd = "ssh-keygen -lf /etc/ssh/ssh_host_{0}_key.pub".format( + keypair_type) ret = shellutil.run_get_output(cmd) if ret[0] == 0: return ret[1].rstrip().split()[1].replace(':', '') @@ -107,13 +109,13 @@ class ProvisionHandler(object): self.config_user_account(ovfenv) self.save_customdata(ovfenv) - + if conf.get_delete_root_password(): self.osutil.del_root_password() except OSUtilError as e: raise ProvisionError("Failed to handle ovf-env.xml: {0}".format(e)) - + def config_user_account(self, ovfenv): logger.info("Create user account if not exists") self.osutil.useradd(ovfenv.username) @@ -123,27 +125,18 @@ class ProvisionHandler(object): crypt_id = conf.get_password_cryptid() salt_len = conf.get_password_crypt_salt_len() self.osutil.chpasswd(ovfenv.username, ovfenv.user_password, - crypt_id=crypt_id, salt_len=salt_len) - + crypt_id=crypt_id, salt_len=salt_len) + logger.info("Configure sudoer") - self.osutil.conf_sudoer(ovfenv.username, nopasswd=ovfenv.user_password is None) + self.osutil.conf_sudoer(ovfenv.username, + nopasswd=ovfenv.user_password is None) logger.info("Configure sshd") self.osutil.conf_sshd(ovfenv.disable_ssh_password_auth) - #Disable selinux temporary - sel = self.osutil.is_selinux_enforcing() - if sel: - self.osutil.set_selinux_enforce(0) - self.deploy_ssh_pubkeys(ovfenv) self.deploy_ssh_keypairs(ovfenv) - if sel: - self.osutil.set_selinux_enforce(1) - - self.osutil.restart_ssh_service() - def save_customdata(self, ovfenv): customdata = ovfenv.customdata if customdata is None: @@ -152,11 +145,11 @@ class ProvisionHandler(object): logger.info("Save custom data") lib_dir = conf.get_lib_dir() if conf.get_decode_customdata(): - customdata= self.osutil.decode_customdata(customdata) + customdata = self.osutil.decode_customdata(customdata) customdata_file = os.path.join(lib_dir, CUSTOM_DATA_FILE) fileutil.write_file(customdata_file, customdata) - + if conf.get_execute_customdata(): logger.info("Execute custom data") os.chmod(customdata_file, 0o700) @@ -183,6 +176,7 @@ class ProvisionHandler(object): protocol = self.protocol_util.get_protocol() protocol.report_provision_status(status) except ProtocolError as e: + logger.error("Reporting NotReady failed: {0}", e) self.report_event(ustr(e)) def report_ready(self, thumbprint=None): @@ -192,5 +186,5 @@ class ProvisionHandler(object): protocol = self.protocol_util.get_protocol() protocol.report_provision_status(status) except ProtocolError as e: + logger.error("Reporting Ready failed: {0}", e) self.report_event(ustr(e)) - diff --git a/azurelinuxagent/pa/provision/ubuntu.py b/azurelinuxagent/pa/provision/ubuntu.py index c334f23..7f2bce3 100644 --- a/azurelinuxagent/pa/provision/ubuntu.py +++ b/azurelinuxagent/pa/provision/ubuntu.py @@ -32,12 +32,14 @@ from azurelinuxagent.pa.provision.default import ProvisionHandler """ On ubuntu image, provision could be disabled. """ + + class UbuntuProvisionHandler(ProvisionHandler): def __init__(self): super(UbuntuProvisionHandler, self).__init__() def run(self): - #If provision is enabled, run default provision handler + # If provision is enabled, run default provision handler if conf.get_provision_enabled(): super(UbuntuProvisionHandler, self).run() return @@ -49,23 +51,21 @@ class UbuntuProvisionHandler(ProvisionHandler): logger.info("Waiting cloud-init to copy ovf-env.xml.") self.wait_for_ovfenv() - - protocol = self.protocol_util.get_protocol() + self.protocol_util.get_protocol() self.report_not_ready("Provisioning", "Starting") - logger.info("Sleep 15 seconds to prevent throttling") - time.sleep(15) #Sleep to prevent throttling + logger.info("Sleeping 1 second to avoid throttling.") + time.sleep(1) try: logger.info("Wait for ssh host key to be generated.") thumbprint = self.wait_for_ssh_host_key() fileutil.write_file(provisioned, "") logger.info("Finished provisioning") - except ProvisionError as e: logger.error("Provision failed: {0}", e) self.report_not_ready("ProvisioningFailed", ustr(e)) self.report_event(ustr(e)) return - + self.report_ready(thumbprint) self.report_event("Provision succeed", is_success=True) diff --git a/azurelinuxagent/pa/rdma/centos.py b/azurelinuxagent/pa/rdma/centos.py index c527e1b..8ad09c5 100644 --- a/azurelinuxagent/pa/rdma/centos.py +++ b/azurelinuxagent/pa/rdma/centos.py @@ -20,6 +20,7 @@ import glob import os import re +import time import azurelinuxagent.common.logger as logger import azurelinuxagent.common.utils.shellutil as shellutil from azurelinuxagent.common.rdma import RDMAHandler @@ -47,6 +48,17 @@ class CentOSRDMAHandler(RDMAHandler): Install the KVP daemon and the appropriate RDMA driver package for the RDMA firmware. """ + + # Check and install the KVP deamon if it not running + time.sleep(10) # give some time for the hv_hvp_daemon to start up. + kvpd_running = RDMAHandler.is_kvp_daemon_running() + logger.info('RDMA: kvp daemon running: %s' % kvpd_running) + if not kvpd_running: + self.check_or_install_kvp_daemon() + time.sleep(10) # wait for post-install reboot or kvp to come up + + # Find out RDMA firmware version and see if the existing package needs + # updating or if the package is missing altogether (and install it) fw_version = RDMAHandler.get_rdma_version() if not fw_version: raise Exception('Cannot determine RDMA firmware version') @@ -187,12 +199,20 @@ class CentOSRDMAHandler(RDMAHandler): raise Exception( "Failed to install RDMA {0} package".format(pkg_type)) + @staticmethod + def is_package_installed(pkg): + """Runs rpm -q and checks return code to find out if a package + is installed""" + return shellutil.run("rpm -q %s" % pkg, chk_err=False) == 0 + def uninstall_kvp_driver_package_if_exists(self): + logger.info('RDMA: deleting existing kvp driver packages') + kvp_pkgs = [self.hyper_v_package_name, self.hyper_v_package_name_new] for kvp_pkg in kvp_pkgs: - if shellutil.run("rpm -q %s" % kvp_pkg, chk_err=False) != 0: + if not self.is_package_installed(kvp_pkg): logger.info( "RDMA: kvp package %s does not exist, skipping" % kvp_pkg) else: @@ -201,3 +221,26 @@ class CentOSRDMAHandler(RDMAHandler): logger.info("RDMA: successfully erased package") else: logger.error("RDMA: failed to erase package") + + def check_or_install_kvp_daemon(self): + """Checks if kvp daemon package is installed, if not installs the + package and reboots the machine. + """ + logger.info("RDMA: Checking kvp daemon packages.") + kvp_pkgs = [self.hyper_v_package_name, + self.hyper_v_package_name_new] + + for pkg in kvp_pkgs: + logger.info("RDMA: Checking if package %s installed" % pkg) + installed = self.is_package_installed(pkg) + if installed: + raise Exception('RDMA: package %s is installed, but the kvp daemon is not running' % pkg) + + kvp_pkg_to_install=self.hyper_v_package_name + logger.info("RDMA: no kvp drivers installed, will install '%s'" % kvp_pkg_to_install) + logger.info("RDMA: trying to install kvp package '%s'" % kvp_pkg_to_install) + if self.install_package(kvp_pkg_to_install) != 0: + raise Exception("RDMA: failed to install kvp daemon package '%s'" % kvp_pkg_to_install) + logger.info("RDMA: package '%s' successfully installed" % kvp_pkg_to_install) + logger.info("RDMA: Machine will now be rebooted.") + self.reboot_system()
\ No newline at end of file |