diff options
Diffstat (limited to 'tests/cloud_tests/platforms/ec2/platform.py')
-rw-r--r-- | tests/cloud_tests/platforms/ec2/platform.py | 263 |
1 files changed, 0 insertions, 263 deletions
diff --git a/tests/cloud_tests/platforms/ec2/platform.py b/tests/cloud_tests/platforms/ec2/platform.py deleted file mode 100644 index b61a2ffb..00000000 --- a/tests/cloud_tests/platforms/ec2/platform.py +++ /dev/null @@ -1,263 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Base EC2 platform.""" -from datetime import datetime -import os - -import boto3 -import botocore -from botocore import session, handlers -import base64 - -from ..platforms import Platform -from .image import EC2Image -from .instance import EC2Instance -from tests.cloud_tests import LOG - - -class EC2Platform(Platform): - """EC2 test platform.""" - - platform_name = 'ec2' - ipv4_cidr = '192.168.1.0/20' - - def __init__(self, config): - """Set up platform.""" - super(EC2Platform, self).__init__(config) - # Used for unique VPC, SSH key, and custom AMI generation naming - self.tag = '%s-%s' % ( - config['tag'], datetime.now().strftime('%Y%m%d%H%M%S')) - self.instance_type = config['instance-type'] - - try: - b3session = get_session() - self.ec2_client = b3session.client('ec2') - self.ec2_resource = b3session.resource('ec2') - self.ec2_region = b3session.region_name - self.key_name = self._upload_public_key(config) - except botocore.exceptions.NoRegionError as e: - raise RuntimeError( - 'Please configure default region in $HOME/.aws/config' - ) from e - except botocore.exceptions.NoCredentialsError as e: - raise RuntimeError( - 'Please configure ec2 credentials in $HOME/.aws/credentials' - ) from e - - self.vpc = self._create_vpc() - self.internet_gateway = self._create_internet_gateway() - self.subnet = self._create_subnet() - self.routing_table = self._create_routing_table() - self.security_group = self._create_security_group() - - def create_instance(self, properties, config, features, - image_ami, user_data=None): - """Create an instance - - @param src_img_path: image path to launch from - @param properties: image properties - @param config: image configuration - @param features: image features - @param image_ami: string of image ami ID - @param user_data: test user-data to pass to instance - @return_value: cloud_tests.instances instance - """ - return EC2Instance(self, properties, config, features, - image_ami, user_data) - - def destroy(self): - """Delete SSH keys, terminate all instances, and delete VPC.""" - for instance in self.vpc.instances.all(): - LOG.debug('waiting for instance %s termination', instance.id) - instance.terminate() - instance.wait_until_terminated() - - if self.key_name: - LOG.debug('deleting SSH key %s', self.key_name) - self.ec2_client.delete_key_pair(KeyName=self.key_name) - - if self.security_group: - LOG.debug('deleting security group %s', self.security_group.id) - self.security_group.delete() - - if self.subnet: - LOG.debug('deleting subnet %s', self.subnet.id) - self.subnet.delete() - - if self.routing_table: - LOG.debug('deleting routing table %s', self.routing_table.id) - self.routing_table.delete() - - if self.internet_gateway: - LOG.debug('deleting internet gateway %s', self.internet_gateway.id) - self.internet_gateway.detach_from_vpc(VpcId=self.vpc.id) - self.internet_gateway.delete() - - if self.vpc: - LOG.debug('deleting vpc %s', self.vpc.id) - self.vpc.delete() - - def get_image(self, img_conf): - """Get image using specified image configuration. - - Hard coded for 'amd64' based images. - - @param img_conf: configuration for image - @return_value: cloud_tests.images instance - """ - if img_conf['root-store'] == 'ebs': - root_store = 'ssd' - elif img_conf['root-store'] == 'instance-store': - root_store = 'instance' - else: - raise RuntimeError('Unknown root-store type: %s' % - (img_conf['root-store'])) - - filters = [ - 'arch=%s' % 'amd64', - 'endpoint=https://ec2.%s.amazonaws.com' % self.ec2_region, - 'region=%s' % self.ec2_region, - 'release=%s' % img_conf['release'], - 'root_store=%s' % root_store, - 'virt=hvm', - ] - - LOG.debug('finding image using streams') - image = self._query_streams(img_conf, filters) - - try: - image_ami = image['id'] - except KeyError as e: - raise RuntimeError( - 'No images found for %s!' % img_conf['release'] - ) from e - - LOG.debug('found image: %s', image_ami) - image = EC2Image(self, img_conf, image_ami) - return image - - def _create_internet_gateway(self): - """Create Internet Gateway and assign to VPC.""" - LOG.debug('creating internet gateway') - # pylint: disable=no-member - internet_gateway = self.ec2_resource.create_internet_gateway() - internet_gateway.attach_to_vpc(VpcId=self.vpc.id) - self._tag_resource(internet_gateway) - - return internet_gateway - - def _create_routing_table(self): - """Update default routing table with internet gateway. - - This sets up internet access between the VPC via the internet gateway - by configuring routing tables for IPv4 and IPv6. - """ - LOG.debug('creating routing table') - route_table = self.vpc.create_route_table() - route_table.create_route(DestinationCidrBlock='0.0.0.0/0', - GatewayId=self.internet_gateway.id) - route_table.create_route(DestinationIpv6CidrBlock='::/0', - GatewayId=self.internet_gateway.id) - route_table.associate_with_subnet(SubnetId=self.subnet.id) - self._tag_resource(route_table) - - return route_table - - def _create_security_group(self): - """Enables ingress to default VPC security group.""" - LOG.debug('creating security group') - security_group = self.vpc.create_security_group( - GroupName=self.tag, Description='integration test security group') - security_group.authorize_ingress( - IpProtocol='-1', FromPort=-1, ToPort=-1, CidrIp='0.0.0.0/0') - self._tag_resource(security_group) - - return security_group - - def _create_subnet(self): - """Generate IPv4 and IPv6 subnets for use.""" - ipv6_cidr = self.vpc.ipv6_cidr_block_association_set[0][ - 'Ipv6CidrBlock'][:-2] + '64' - - LOG.debug('creating subnet with following ranges:') - LOG.debug('ipv4: %s', self.ipv4_cidr) - LOG.debug('ipv6: %s', ipv6_cidr) - subnet = self.vpc.create_subnet(CidrBlock=self.ipv4_cidr, - Ipv6CidrBlock=ipv6_cidr) - modify_subnet = subnet.meta.client.modify_subnet_attribute - modify_subnet(SubnetId=subnet.id, - MapPublicIpOnLaunch={'Value': True}) - self._tag_resource(subnet) - - return subnet - - def _create_vpc(self): - """Setup AWS EC2 VPC or return existing VPC.""" - LOG.debug('creating new vpc') - try: - vpc = self.ec2_resource.create_vpc( # pylint: disable=no-member - CidrBlock=self.ipv4_cidr, - AmazonProvidedIpv6CidrBlock=True) - except botocore.exceptions.ClientError as e: - raise RuntimeError(e) from e - - vpc.wait_until_available() - self._tag_resource(vpc) - - return vpc - - def _tag_resource(self, resource): - """Tag a resource with the specified tag. - - This makes finding and deleting resources specific to this testing - much easier to find. - - @param resource: resource to tag - """ - tag = { - 'Key': 'Name', - 'Value': self.tag - } - resource.create_tags(Tags=[tag]) - - def _upload_public_key(self, config): - """Generate random name and upload SSH key with that name. - - @param config: platform config - @return: string of ssh key name - """ - key_file = os.path.join(config['data_dir'], config['public_key']) - with open(key_file, 'r') as file: - public_key = file.read().strip('\n') - - LOG.debug('uploading SSH key %s', self.tag) - self.ec2_client.import_key_pair(KeyName=self.tag, - PublicKeyMaterial=public_key) - - return self.tag - - -def _decode_console_output_as_bytes(parsed, **kwargs): - """Provide console output as bytes in OutputBytes. - - For this to be useful, the session has to have had the - decode_console_output handler unregistered already. - - https://github.com/boto/botocore/issues/1351 .""" - if 'Output' not in parsed: - return - orig = parsed['Output'] - handlers.decode_console_output(parsed, **kwargs) - parsed['OutputBytes'] = base64.b64decode(orig) - - -def get_session(): - mysess = session.get_session() - mysess.unregister('after-call.ec2.GetConsoleOutput', - handlers.decode_console_output) - mysess.register('after-call.ec2.GetConsoleOutput', - _decode_console_output_as_bytes) - return boto3.Session(botocore_session=mysess) - - -# vi: ts=4 expandtab |