summaryrefslogtreecommitdiff
path: root/azurelinuxagent/utils
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/utils')
-rw-r--r--azurelinuxagent/utils/__init__.py19
-rw-r--r--azurelinuxagent/utils/cryptutil.py121
-rw-r--r--azurelinuxagent/utils/fileutil.py187
-rw-r--r--azurelinuxagent/utils/restutil.py156
-rw-r--r--azurelinuxagent/utils/shellutil.py89
-rw-r--r--azurelinuxagent/utils/textutil.py236
6 files changed, 0 insertions, 808 deletions
diff --git a/azurelinuxagent/utils/__init__.py b/azurelinuxagent/utils/__init__.py
deleted file mode 100644
index d9b82f5..0000000
--- a/azurelinuxagent/utils/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-#
-
diff --git a/azurelinuxagent/utils/cryptutil.py b/azurelinuxagent/utils/cryptutil.py
deleted file mode 100644
index 5ee5637..0000000
--- a/azurelinuxagent/utils/cryptutil.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-#
-
-import base64
-import struct
-from azurelinuxagent.future import ustr, bytebuffer
-from azurelinuxagent.exception import CryptError
-import azurelinuxagent.utils.shellutil as shellutil
-
-class CryptUtil(object):
- def __init__(self, openssl_cmd):
- self.openssl_cmd = openssl_cmd
-
- def gen_transport_cert(self, prv_file, crt_file):
- """
- Create ssl certificate for https communication with endpoint server.
- """
- cmd = ("{0} req -x509 -nodes -subj /CN=LinuxTransport -days 32768 "
- "-newkey rsa:2048 -keyout {1} "
- "-out {2}").format(self.openssl_cmd, prv_file, crt_file)
- shellutil.run(cmd)
-
- def get_pubkey_from_prv(self, file_name):
- cmd = "{0} rsa -in {1} -pubout 2>/dev/null".format(self.openssl_cmd,
- file_name)
- pub = shellutil.run_get_output(cmd)[1]
- return pub
-
- def get_pubkey_from_crt(self, file_name):
- cmd = "{0} x509 -in {1} -pubkey -noout".format(self.openssl_cmd,
- file_name)
- pub = shellutil.run_get_output(cmd)[1]
- return pub
-
- def get_thumbprint_from_crt(self, file_name):
- cmd="{0} x509 -in {1} -fingerprint -noout".format(self.openssl_cmd,
- file_name)
- thumbprint = shellutil.run_get_output(cmd)[1]
- thumbprint = thumbprint.rstrip().split('=')[1].replace(':', '').upper()
- return thumbprint
-
- def decrypt_p7m(self, p7m_file, trans_prv_file, trans_cert_file, pem_file):
- cmd = ("{0} cms -decrypt -in {1} -inkey {2} -recip {3} "
- "| {4} pkcs12 -nodes -password pass: -out {5}"
- "").format(self.openssl_cmd, p7m_file, trans_prv_file,
- trans_cert_file, self.openssl_cmd, pem_file)
- shellutil.run(cmd)
-
- def crt_to_ssh(self, input_file, output_file):
- shellutil.run("ssh-keygen -i -m PKCS8 -f {0} >> {1}".format(input_file,
- output_file))
-
- def asn1_to_ssh(self, pubkey):
- lines = pubkey.split("\n")
- lines = [x for x in lines if not x.startswith("----")]
- base64_encoded = "".join(lines)
- try:
- #TODO remove pyasn1 dependency
- from pyasn1.codec.der import decoder as der_decoder
- der_encoded = base64.b64decode(base64_encoded)
- der_encoded = der_decoder.decode(der_encoded)[0][1]
- key = der_decoder.decode(self.bits_to_bytes(der_encoded))[0]
- n=key[0]
- e=key[1]
- keydata = bytearray()
- keydata.extend(struct.pack('>I', len("ssh-rsa")))
- keydata.extend(b"ssh-rsa")
- keydata.extend(struct.pack('>I', len(self.num_to_bytes(e))))
- keydata.extend(self.num_to_bytes(e))
- keydata.extend(struct.pack('>I', len(self.num_to_bytes(n)) + 1))
- keydata.extend(b"\0")
- keydata.extend(self.num_to_bytes(n))
- keydata_base64 = base64.b64encode(bytebuffer(keydata))
- return ustr(b"ssh-rsa " + keydata_base64 + b"\n",
- encoding='utf-8')
- except ImportError as e:
- raise CryptError("Failed to load pyasn1.codec.der")
-
- def num_to_bytes(self, num):
- """
- Pack number into bytes. Retun as string.
- """
- result = bytearray()
- while num:
- result.append(num & 0xFF)
- num >>= 8
- result.reverse()
- return result
-
- def bits_to_bytes(self, bits):
- """
- Convert an array contains bits, [0,1] to a byte array
- """
- index = 7
- byte_array = bytearray()
- curr = 0
- for bit in bits:
- curr = curr | (bit << index)
- index = index - 1
- if index == -1:
- byte_array.append(curr)
- curr = 0
- index = 7
- return bytes(byte_array)
-
diff --git a/azurelinuxagent/utils/fileutil.py b/azurelinuxagent/utils/fileutil.py
deleted file mode 100644
index 5369a7c..0000000
--- a/azurelinuxagent/utils/fileutil.py
+++ /dev/null
@@ -1,187 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-#
-
-"""
-File operation util functions
-"""
-
-import os
-import re
-import shutil
-import pwd
-import tempfile
-import azurelinuxagent.logger as logger
-from azurelinuxagent.future import ustr
-import azurelinuxagent.utils.textutil as textutil
-
-def read_file(filepath, asbin=False, remove_bom=False, encoding='utf-8'):
- """
- Read and return contents of 'filepath'.
- """
- mode = 'rb'
- with open(filepath, mode) as in_file:
- data = in_file.read()
- if data is None:
- return None
-
- if asbin:
- return data
-
- if remove_bom:
- #Remove bom on bytes data before it is converted into string.
- data = textutil.remove_bom(data)
- data = ustr(data, encoding=encoding)
- return data
-
-def write_file(filepath, contents, asbin=False, encoding='utf-8', append=False):
- """
- Write 'contents' to 'filepath'.
- """
- mode = "ab" if append else "wb"
- data = contents
- if not asbin:
- data = contents.encode(encoding)
- with open(filepath, mode) as out_file:
- out_file.write(data)
-
-def append_file(filepath, contents, asbin=False, encoding='utf-8'):
- """
- Append 'contents' to 'filepath'.
- """
- write_file(filepath, contents, asbin=asbin, encoding=encoding, append=True)
-
-def replace_file(filepath, contents):
- """
- Write 'contents' to 'filepath' by creating a temp file,
- and replacing original.
- """
- handle, temp = tempfile.mkstemp(dir=os.path.dirname(filepath))
- #if type(contents) == str:
- #contents=contents.encode('latin-1')
- try:
- os.write(handle, contents)
- except IOError as err:
- logger.error('Write to file {0}, Exception is {1}', filepath, err)
- return 1
- finally:
- os.close(handle)
-
- try:
- os.rename(temp, filepath)
- except IOError as err:
- logger.info('Rename {0} to {1}, Exception is {2}', temp, filepath, err)
- logger.info('Remove original file and retry')
- try:
- os.remove(filepath)
- except IOError as err:
- logger.error('Remove {0}, Exception is {1}', temp, filepath, err)
-
- try:
- os.rename(temp, filepath)
- except IOError as err:
- logger.error('Rename {0} to {1}, Exception is {2}', temp, filepath,
- err)
- return 1
- return 0
-
-
-def base_name(path):
- head, tail = os.path.split(path)
- return tail
-
-def get_line_startingwith(prefix, filepath):
- """
- Return line from 'filepath' if the line startswith 'prefix'
- """
- for line in read_file(filepath).split('\n'):
- if line.startswith(prefix):
- return line
- return None
-
-#End File operation util functions
-
-def mkdir(dirpath, mode=None, owner=None):
- if not os.path.isdir(dirpath):
- os.makedirs(dirpath)
- if mode is not None:
- chmod(dirpath, mode)
- if owner is not None:
- chowner(dirpath, owner)
-
-def chowner(path, owner):
- owner_info = pwd.getpwnam(owner)
- os.chown(path, owner_info[2], owner_info[3])
-
-def chmod(path, mode):
- os.chmod(path, mode)
-
-def rm_files(*args):
- for path in args:
- if os.path.isfile(path):
- os.remove(path)
-
-def rm_dirs(*args):
- """
- Remove all the contents under the directry
- """
- for dir_name in args:
- if os.path.isdir(dir_name):
- for item in os.listdir(dir_name):
- path = os.path.join(dir_name, item)
- if os.path.isfile(path):
- os.remove(path)
- elif os.path.isdir(path):
- shutil.rmtree(path)
-
-def update_conf_file(path, line_start, val, chk_err=False):
- conf = []
- if not os.path.isfile(path) and chk_err:
- raise IOError("Can't find config file:{0}".format(path))
- conf = read_file(path).split('\n')
- conf = [x for x in conf if not x.startswith(line_start)]
- conf.append(val)
- replace_file(path, '\n'.join(conf))
-
-def search_file(target_dir_name, target_file_name):
- for root, dirs, files in os.walk(target_dir_name):
- for file_name in files:
- if file_name == target_file_name:
- return os.path.join(root, file_name)
- return None
-
-def chmod_tree(path, mode):
- for root, dirs, files in os.walk(path):
- for file_name in files:
- os.chmod(os.path.join(root, file_name), mode)
-
-def findstr_in_file(file_path, pattern_str):
- """
- Return match object if found in file.
- """
- try:
- pattern = re.compile(pattern_str)
- for line in (open(file_path, 'r')).readlines():
- match = re.search(pattern, line)
- if match:
- return match
- except:
- raise
-
- return None
-
diff --git a/azurelinuxagent/utils/restutil.py b/azurelinuxagent/utils/restutil.py
deleted file mode 100644
index 2e8b0be..0000000
--- a/azurelinuxagent/utils/restutil.py
+++ /dev/null
@@ -1,156 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-#
-
-import time
-import platform
-import os
-import subprocess
-import azurelinuxagent.conf as conf
-import azurelinuxagent.logger as logger
-from azurelinuxagent.exception import HttpError
-from azurelinuxagent.future import httpclient, urlparse
-
-"""
-REST api util functions
-"""
-
-RETRY_WAITING_INTERVAL = 10
-
-def _parse_url(url):
- o = urlparse(url)
- rel_uri = o.path
- if o.fragment:
- rel_uri = "{0}#{1}".format(rel_uri, o.fragment)
- if o.query:
- rel_uri = "{0}?{1}".format(rel_uri, o.query)
- secure = False
- if o.scheme.lower() == "https":
- secure = True
- return o.hostname, o.port, secure, rel_uri
-
-def get_http_proxy():
- """
- Get http_proxy and https_proxy from environment variables.
- Username and password is not supported now.
- """
- host = conf.get_httpproxy_host()
- port = conf.get_httpproxy_port()
- return (host, port)
-
-def _http_request(method, host, rel_uri, port=None, data=None, secure=False,
- headers=None, proxy_host=None, proxy_port=None):
- url, conn = None, None
- if secure:
- port = 443 if port is None else port
- if proxy_host is not None and proxy_port is not None:
- conn = httpclient.HTTPSConnection(proxy_host, proxy_port, timeout=10)
- conn.set_tunnel(host, port)
- #If proxy is used, full url is needed.
- url = "https://{0}:{1}{2}".format(host, port, rel_uri)
- else:
- conn = httpclient.HTTPSConnection(host, port, timeout=10)
- url = rel_uri
- else:
- port = 80 if port is None else port
- if proxy_host is not None and proxy_port is not None:
- conn = httpclient.HTTPConnection(proxy_host, proxy_port, timeout=10)
- #If proxy is used, full url is needed.
- url = "http://{0}:{1}{2}".format(host, port, rel_uri)
- else:
- conn = httpclient.HTTPConnection(host, port, timeout=10)
- url = rel_uri
- if headers == None:
- conn.request(method, url, data)
- else:
- conn.request(method, url, data, headers)
- resp = conn.getresponse()
- return resp
-
-def http_request(method, url, data, headers=None, max_retry=3, chk_proxy=False):
- """
- Sending http request to server
- On error, sleep 10 and retry max_retry times.
- """
- logger.verb("HTTP Req: {0} {1}", method, url)
- logger.verb(" Data={0}", data)
- logger.verb(" Header={0}", headers)
- host, port, secure, rel_uri = _parse_url(url)
-
- #Check proxy
- proxy_host, proxy_port = (None, None)
- if chk_proxy:
- proxy_host, proxy_port = get_http_proxy()
-
- #If httplib module is not built with ssl support. Fallback to http
- if secure and not hasattr(httpclient, "HTTPSConnection"):
- logger.warn("httplib is not built with ssl support")
- secure = False
-
- #If httplib module doesn't support https tunnelling. Fallback to http
- if secure and \
- proxy_host is not None and \
- proxy_port is not None and \
- not hasattr(httpclient.HTTPSConnection, "set_tunnel"):
- logger.warn("httplib doesn't support https tunnelling(new in python 2.7)")
- secure = False
-
- for retry in range(0, max_retry):
- try:
- resp = _http_request(method, host, rel_uri, port=port, data=data,
- secure=secure, headers=headers,
- proxy_host=proxy_host, proxy_port=proxy_port)
- logger.verb("HTTP Resp: Status={0}", resp.status)
- logger.verb(" Header={0}", resp.getheaders())
- return resp
- except httpclient.HTTPException as e:
- logger.warn('HTTPException {0}, args:{1}', e, repr(e.args))
- except IOError as e:
- logger.warn('Socket IOError {0}, args:{1}', e, repr(e.args))
-
- if retry < max_retry - 1:
- logger.info("Retry={0}, {1} {2}", retry, method, url)
- time.sleep(RETRY_WAITING_INTERVAL)
-
- if url is not None and len(url) > 100:
- url_log = url[0: 100] #In case the url is too long
- else:
- url_log = url
- raise HttpError("HTTP Err: {0} {1}".format(method, url_log))
-
-def http_get(url, headers=None, max_retry=3, chk_proxy=False):
- return http_request("GET", url, data=None, headers=headers,
- max_retry=max_retry, chk_proxy=chk_proxy)
-
-def http_head(url, headers=None, max_retry=3, chk_proxy=False):
- return http_request("HEAD", url, None, headers=headers,
- max_retry=max_retry, chk_proxy=chk_proxy)
-
-def http_post(url, data, headers=None, max_retry=3, chk_proxy=False):
- return http_request("POST", url, data, headers=headers,
- max_retry=max_retry, chk_proxy=chk_proxy)
-
-def http_put(url, data, headers=None, max_retry=3, chk_proxy=False):
- return http_request("PUT", url, data, headers=headers,
- max_retry=max_retry, chk_proxy=chk_proxy)
-
-def http_delete(url, headers=None, max_retry=3, chk_proxy=False):
- return http_request("DELETE", url, None, headers=headers,
- max_retry=max_retry, chk_proxy=chk_proxy)
-
-#End REST api util functions
diff --git a/azurelinuxagent/utils/shellutil.py b/azurelinuxagent/utils/shellutil.py
deleted file mode 100644
index 98871a1..0000000
--- a/azurelinuxagent/utils/shellutil.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-#
-
-import platform
-import os
-import subprocess
-from azurelinuxagent.future import ustr
-import azurelinuxagent.logger as logger
-
-if not hasattr(subprocess,'check_output'):
- def check_output(*popenargs, **kwargs):
- r"""Backport from subprocess module from python 2.7"""
- if 'stdout' in kwargs:
- raise ValueError('stdout argument not allowed, '
- 'it will be overridden.')
- process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
- output, unused_err = process.communicate()
- retcode = process.poll()
- if retcode:
- cmd = kwargs.get("args")
- if cmd is None:
- cmd = popenargs[0]
- raise subprocess.CalledProcessError(retcode, cmd, output=output)
- return output
-
- # Exception classes used by this module.
- class CalledProcessError(Exception):
- def __init__(self, returncode, cmd, output=None):
- self.returncode = returncode
- self.cmd = cmd
- self.output = output
- def __str__(self):
- return ("Command '{0}' returned non-zero exit status {1}"
- "").format(self.cmd, self.returncode)
-
- subprocess.check_output=check_output
- subprocess.CalledProcessError=CalledProcessError
-
-
-"""
-Shell command util functions
-"""
-def run(cmd, chk_err=True):
- """
- Calls run_get_output on 'cmd', returning only the return code.
- If chk_err=True then errors will be reported in the log.
- If chk_err=False then errors will be suppressed from the log.
- """
- retcode,out=run_get_output(cmd,chk_err)
- return retcode
-
-def run_get_output(cmd, chk_err=True, log_cmd=True):
- """
- Wrapper for subprocess.check_output.
- Execute 'cmd'. Returns return code and STDOUT, trapping expected exceptions.
- Reports exceptions to Error if chk_err parameter is True
- """
- if log_cmd:
- logger.verb(u"run cmd '{0}'", cmd)
- try:
- output=subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True)
- output = ustr(output, encoding='utf-8', errors="backslashreplace")
- except subprocess.CalledProcessError as e :
- output = ustr(e.output, encoding='utf-8', errors="backslashreplace")
- if chk_err:
- if log_cmd:
- logger.error(u"run cmd '{0}' failed", e.cmd)
- logger.error(u"Error Code:{0}", e.returncode)
- logger.error(u"Result:{0}", output)
- return e.returncode, output
- return 0, output
-
-#End shell command util functions
diff --git a/azurelinuxagent/utils/textutil.py b/azurelinuxagent/utils/textutil.py
deleted file mode 100644
index 851f98a..0000000
--- a/azurelinuxagent/utils/textutil.py
+++ /dev/null
@@ -1,236 +0,0 @@
-# Microsoft Azure Linux Agent
-#
-# Copyright 2014 Microsoft Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Requires Python 2.4+ and Openssl 1.0+
-
-import crypt
-import random
-import string
-import struct
-import xml.dom.minidom as minidom
-import sys
-from distutils.version import LooseVersion
-
-def parse_doc(xml_text):
- """
- Parse xml document from string
- """
- #The minidom lib has some issue with unicode in python2.
- #Encode the string into utf-8 first
- xml_text = xml_text.encode('utf-8')
- return minidom.parseString(xml_text)
-
-def findall(root, tag, namespace=None):
- """
- Get all nodes by tag and namespace under Node root.
- """
- if root is None:
- return []
-
- if namespace is None:
- return root.getElementsByTagName(tag)
- else:
- return root.getElementsByTagNameNS(namespace, tag)
-
-def find(root, tag, namespace=None):
- """
- Get first node by tag and namespace under Node root.
- """
- nodes = findall(root, tag, namespace=namespace)
- if nodes is not None and len(nodes) >= 1:
- return nodes[0]
- else:
- return None
-
-def gettext(node):
- """
- Get node text
- """
- if node is None:
- return None
-
- for child in node.childNodes:
- if child.nodeType == child.TEXT_NODE:
- return child.data
- return None
-
-def findtext(root, tag, namespace=None):
- """
- Get text of node by tag and namespace under Node root.
- """
- node = find(root, tag, namespace=namespace)
- return gettext(node)
-
-def getattrib(node, attr_name):
- """
- Get attribute of xml node
- """
- if node is not None:
- return node.getAttribute(attr_name)
- else:
- return None
-
-def unpack(buf, offset, range):
- """
- Unpack bytes into python values.
- """
- result = 0
- for i in range:
- result = (result << 8) | str_to_ord(buf[offset + i])
- return result
-
-def unpack_little_endian(buf, offset, length):
- """
- Unpack little endian bytes into python values.
- """
- return unpack(buf, offset, list(range(length - 1, -1, -1)))
-
-def unpack_big_endian(buf, offset, length):
- """
- Unpack big endian bytes into python values.
- """
- return unpack(buf, offset, list(range(0, length)))
-
-def hex_dump3(buf, offset, length):
- """
- Dump range of buf in formatted hex.
- """
- return ''.join(['%02X' % str_to_ord(char) for char in buf[offset:offset + length]])
-
-def hex_dump2(buf):
- """
- Dump buf in formatted hex.
- """
- return hex_dump3(buf, 0, len(buf))
-
-def is_in_range(a, low, high):
- """
- Return True if 'a' in 'low' <= a >= 'high'
- """
- return (a >= low and a <= high)
-
-def is_printable(ch):
- """
- Return True if character is displayable.
- """
- return (is_in_range(ch, str_to_ord('A'), str_to_ord('Z'))
- or is_in_range(ch, str_to_ord('a'), str_to_ord('z'))
- or is_in_range(ch, str_to_ord('0'), str_to_ord('9')))
-
-def hex_dump(buffer, size):
- """
- Return Hex formated dump of a 'buffer' of 'size'.
- """
- if size < 0:
- size = len(buffer)
- result = ""
- for i in range(0, size):
- if (i % 16) == 0:
- result += "%06X: " % i
- byte = buffer[i]
- if type(byte) == str:
- byte = ord(byte.decode('latin1'))
- result += "%02X " % byte
- if (i & 15) == 7:
- result += " "
- if ((i + 1) % 16) == 0 or (i + 1) == size:
- j = i
- while ((j + 1) % 16) != 0:
- result += " "
- if (j & 7) == 7:
- result += " "
- j += 1
- result += " "
- for j in range(i - (i % 16), i + 1):
- byte=buffer[j]
- if type(byte) == str:
- byte = str_to_ord(byte.decode('latin1'))
- k = '.'
- if is_printable(byte):
- k = chr(byte)
- result += k
- if (i + 1) != size:
- result += "\n"
- return result
-
-def str_to_ord(a):
- """
- Allows indexing into a string or an array of integers transparently.
- Generic utility function.
- """
- if type(a) == type(b'') or type(a) == type(u''):
- a = ord(a)
- return a
-
-def compare_bytes(a, b, start, length):
- for offset in range(start, start + length):
- if str_to_ord(a[offset]) != str_to_ord(b[offset]):
- return False
- return True
-
-def int_to_ip4_addr(a):
- """
- Build DHCP request string.
- """
- return "%u.%u.%u.%u" % ((a >> 24) & 0xFF,
- (a >> 16) & 0xFF,
- (a >> 8) & 0xFF,
- (a) & 0xFF)
-
-def hexstr_to_bytearray(a):
- """
- Return hex string packed into a binary struct.
- """
- b = b""
- for c in range(0, len(a) // 2):
- b += struct.pack("B", int(a[c * 2:c * 2 + 2], 16))
- return b
-
-def set_ssh_config(config, name, val):
- notfound = True
- for i in range(0, len(config)):
- if config[i].startswith(name):
- config[i] = "{0} {1}".format(name, val)
- notfound = False
- elif config[i].startswith("Match"):
- #Match block must be put in the end of sshd config
- break
- if notfound:
- config.insert(i, "{0} {1}".format(name, val))
- return config
-
-def remove_bom(c):
- if str_to_ord(c[0]) > 128 and str_to_ord(c[1]) > 128 and \
- str_to_ord(c[2]) > 128:
- c = c[3:]
- return c
-
-def gen_password_hash(password, crypt_id, salt_len):
- collection = string.ascii_letters + string.digits
- salt = ''.join(random.choice(collection) for _ in range(salt_len))
- salt = "${0}${1}".format(crypt_id, salt)
- return crypt.crypt(password, salt)
-
-def get_bytes_from_pem(pem_str):
- base64_bytes = ""
- for line in pem_str.split('\n'):
- if "----" not in line:
- base64_bytes += line
- return base64_bytes
-
-
-Version = LooseVersion
-