diff options
Diffstat (limited to 'tests/unittests/test_datasource')
| -rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 153 | 
1 files changed, 91 insertions, 62 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index b9b3a479..f53715b0 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -29,20 +29,17 @@ from cloudinit.sources import DataSourceSmartOS  from mocker import MockerTestCase  import uuid -mock_returns = { +MOCK_RETURNS = {      'hostname': 'test-host',      'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',      'disable_iptables_flag': None,      'enable_motd_sys_info': None, -    'system_uuid': str(uuid.uuid4()), -    'smartdc': 'smartdc',      'test-var1': 'some data', -    'user-data': """ -#!/bin/sh -/bin/true -""", +    'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),  } +DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') +  class MockSerial(object):      """Fake a serial terminal for testing the code that @@ -50,14 +47,13 @@ class MockSerial(object):      port = None -    def __init__(self, b64encode=False): +    def __init__(self, mockdata):          self.last = None          self.last = None          self.new = True          self.count = 0          self.mocked_out = [] -        self.b64encode = b64encode -        self.b64excluded = DataSourceSmartOS.SMARTOS_NO_BASE64 +        self.mockdata = mockdata      def open(self):          return True @@ -75,12 +71,12 @@ class MockSerial(object):      def readline(self):          if self.new:              self.new = False -            if self.last in mock_returns: +            if self.last in self.mockdata:                  return 'SUCCESS\n'              else:                  return 'NOTFOUND %s\n' % self.last -        if self.last in mock_returns: +        if self.last in self.mockdata:              if not self.mocked_out:                  self.mocked_out = [x for x in self._format_out()]                  print self.mocked_out @@ -90,21 +86,16 @@ class MockSerial(object):                  return self.mocked_out[self.count - 1]      def _format_out(self): -        if self.last in mock_returns: -            _mret = mock_returns[self.last] -            if self.b64encode and \ -               self.last not in self.b64excluded: -                yield base64.b64encode(_mret) - -            else: -                try: -                    for l in _mret.splitlines(): -                        yield "%s\n" % l.rstrip() -                except: -                    yield "%s\n" % _mret.rstrip() +        if self.last in self.mockdata: +            _mret = self.mockdata[self.last] +            try: +                for l in _mret.splitlines(): +                    yield "%s\n" % l.rstrip() +            except: +                yield "%s\n" % _mret.rstrip() -            yield '\n'              yield '.' +            yield '\n'  class TestSmartOSDataSource(MockerTestCase): @@ -126,26 +117,36 @@ class TestSmartOSDataSource(MockerTestCase):          ret = apply_patches(patches)          self.unapply += ret -    def _get_ds(self, b64encode=False, sys_cfg=None): +    def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):          mod = DataSourceSmartOS +        if mockdata is None: +            mockdata = MOCK_RETURNS + +        if dmi_data is None: +            dmi_data = DMI_DATA_RETURN +          def _get_serial(*_): -            return MockSerial(b64encode=b64encode) +            return MockSerial(mockdata)          def _dmi_data(): -            return mock_returns['system_uuid'], 'smartdc' +            return dmi_data -        if not sys_cfg: +        if sys_cfg is None:              sys_cfg = {} -        data = {'sys_cfg': sys_cfg} +        if ds_cfg is not None: +            sys_cfg['datasource'] = sys_cfg.get('datasource', {}) +            sys_cfg['datasource']['SmartOS'] = ds_cfg +          self.apply_patches([(mod, 'get_serial', _get_serial)])          self.apply_patches([(mod, 'dmi_data', _dmi_data)]) -        dsrc = mod.DataSourceSmartOS( -            data.get('sys_cfg', {}), distro=None, paths=self.paths) +        dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, +                                     paths=self.paths)          return dsrc      def test_seed(self): +        # default seed should be /dev/ttyS1          dsrc = self._get_ds()          ret = dsrc.get_data()          self.assertTrue(ret) @@ -158,78 +159,106 @@ class TestSmartOSDataSource(MockerTestCase):          self.assertTrue(dsrc.is_smartdc)      def test_no_base64(self): -        sys_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} -        dsrc = self._get_ds(sys_cfg=sys_cfg) +        ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} +        dsrc = self._get_ds(ds_cfg=ds_cfg)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertTrue(dsrc.not_b64_var('test-var'))      def test_uuid(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['system_uuid'], -                          dsrc.metadata['instance-id']) +        self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])      def test_root_keys(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['root_authorized_keys'], +        self.assertEquals(MOCK_RETURNS['root_authorized_keys'],                            dsrc.metadata['public-keys'])      def test_hostname_b64(self): -        dsrc = self._get_ds(b64encode=True) +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(base64.b64encode(mock_returns['hostname']), +        self.assertEquals(MOCK_RETURNS['hostname'],                            dsrc.metadata['local-hostname'])      def test_hostname(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['hostname'], +        self.assertEquals(MOCK_RETURNS['hostname'],                            dsrc.metadata['local-hostname']) -    def test_base64(self): -        """This tests to make sure that SmartOS system key/value pairs -            are not interpetted as being base64 encoded, while making -            sure that the others are when 'decode_base64' is set""" -        dsrc = self._get_ds(sys_cfg={'decode_base64': True}, -                            b64encode=True) +    def test_base64_all(self): +        # metadata provided base64_all of true +        my_returns = MOCK_RETURNS.copy() +        my_returns['base64_all'] = "true" +        for k in ('hostname', 'user-data'): +            my_returns[k] = base64.b64encode(my_returns[k]) + +        dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['hostname'], +        self.assertEquals(MOCK_RETURNS['hostname'],                            dsrc.metadata['local-hostname']) -        self.assertEquals("%s" % mock_returns['user-data'], +        self.assertEquals(MOCK_RETURNS['user-data'],                            dsrc.userdata_raw) -        self.assertEquals(mock_returns['root_authorized_keys'], +        self.assertEquals(MOCK_RETURNS['root_authorized_keys'],                            dsrc.metadata['public-keys']) -        self.assertEquals(mock_returns['disable_iptables_flag'], +        self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],                            dsrc.metadata['iptables_disable']) -        self.assertEquals(mock_returns['enable_motd_sys_info'], +        self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],                            dsrc.metadata['motd_sys_info']) +    def test_b64_userdata(self): +        my_returns = MOCK_RETURNS.copy() +        my_returns['b64-user-data'] = "true" +        my_returns['b64-hostname'] = "true" +        for k in ('hostname', 'user-data'): +            my_returns[k] = base64.b64encode(my_returns[k]) + +        dsrc = self._get_ds(mockdata=my_returns) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEquals(MOCK_RETURNS['hostname'], +                          dsrc.metadata['local-hostname']) +        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) +        self.assertEquals(MOCK_RETURNS['root_authorized_keys'], +                          dsrc.metadata['public-keys']) + +    def test_b64_keys(self): +        my_returns = MOCK_RETURNS.copy() +        my_returns['base64_keys'] = 'hostname,ignored' +        for k in ('hostname',): +            my_returns[k] = base64.b64encode(my_returns[k]) + +        dsrc = self._get_ds(mockdata=my_returns) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEquals(MOCK_RETURNS['hostname'], +                          dsrc.metadata['local-hostname']) +        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) +      def test_userdata(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals("%s\n" % mock_returns['user-data'], -                          dsrc.userdata_raw) +        self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)      def test_disable_iptables_flag(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['disable_iptables_flag'], +        self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],                            dsrc.metadata['iptables_disable'])      def test_motd_sys_info(self): -        dsrc = self._get_ds() +        dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(mock_returns['enable_motd_sys_info'], +        self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],                            dsrc.metadata['motd_sys_info'])  | 
