1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
from cloudinit import util
from cloudinit import net
from .helpers import TestCase
import base64
import copy
import io
import gzip
import json
import os
DHCP_CONTENT_1 = """
DEVICE='eth0'
PROTO='dhcp'
IPV4ADDR='192.168.122.89'
IPV4BROADCAST='192.168.122.255'
IPV4NETMASK='255.255.255.0'
IPV4GATEWAY='192.168.122.1'
IPV4DNS0='192.168.122.1'
IPV4DNS1='0.0.0.0'
HOSTNAME='foohost'
DNSDOMAIN=''
NISDOMAIN=''
ROOTSERVER='192.168.122.1'
ROOTPATH=''
filename=''
UPTIME='21'
DHCPLEASETIME='3600'
DOMAINSEARCH='foo.com'
"""
DHCP_EXPECTED_1 = {
'name': 'eth0',
'type': 'physical',
'subnets': [{'broadcast': '192.168.122.255',
'control': 'manual',
'gateway': '192.168.122.1',
'dns_search': ['foo.com'],
'type': 'dhcp',
'netmask': '255.255.255.0',
'dns_nameservers': ['192.168.122.1']}],
}
STATIC_CONTENT_1 = """
DEVICE='eth1'
PROTO='static'
IPV4ADDR='10.0.0.2'
IPV4BROADCAST='10.0.0.255'
IPV4NETMASK='255.255.255.0'
IPV4GATEWAY='10.0.0.1'
IPV4DNS0='10.0.1.1'
IPV4DNS1='0.0.0.0'
HOSTNAME='foohost'
UPTIME='21'
DHCPLEASETIME='3600'
DOMAINSEARCH='foo.com'
"""
STATIC_EXPECTED_1 = {
'name': 'eth1',
'type': 'physical',
'subnets': [{'broadcast': '10.0.0.255', 'control': 'manual',
'gateway': '10.0.0.1',
'dns_search': ['foo.com'], 'type': 'static',
'netmask': '255.255.255.0',
'dns_nameservers': ['10.0.1.1']}],
}
class TestNetConfigParsing(TestCase):
simple_cfg = {
'config': [{"type": "physical", "name": "eth0",
"mac_address": "c0:d6:9f:2c:e8:80",
"subnets": [{"type": "dhcp"}]}]}
def test_klibc_convert_dhcp(self):
found = net._klibc_to_config_entry(DHCP_CONTENT_1)
self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
def test_klibc_convert_static(self):
found = net._klibc_to_config_entry(STATIC_CONTENT_1)
self.assertEqual(found, ('eth1', STATIC_EXPECTED_1))
def test_config_from_klibc_net_cfg(self):
files = []
pairs = (('net-eth0.cfg', DHCP_CONTENT_1),
('net-eth1.cfg', STATIC_CONTENT_1))
macs = {'eth1': 'b8:ae:ed:75:ff:2b',
'eth0': 'b8:ae:ed:75:ff:2a'}
dhcp = copy.deepcopy(DHCP_EXPECTED_1)
dhcp['mac_address'] = macs['eth0']
static = copy.deepcopy(STATIC_EXPECTED_1)
static['mac_address'] = macs['eth1']
expected = {'version': 1, 'config': [dhcp, static]}
with util.tempdir() as tmpd:
for fname, content in pairs:
fp = os.path.join(tmpd, fname)
files.append(fp)
util.write_file(fp, content)
found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs)
self.assertEqual(found, expected)
def test_cmdline_with_b64(self):
data = base64.b64encode(json.dumps(self.simple_cfg).encode())
encoded_text = data.decode()
cmdline = 'ro network-config=' + encoded_text + ' root=foo'
found = net.read_kernel_cmdline_config(cmdline=cmdline)
self.assertEqual(found, self.simple_cfg)
def test_cmdline_with_b64_gz(self):
data = _gzip_data(json.dumps(self.simple_cfg).encode())
encoded_text = base64.b64encode(data).decode()
cmdline = 'ro network-config=' + encoded_text + ' root=foo'
found = net.read_kernel_cmdline_config(cmdline=cmdline)
self.assertEqual(found, self.simple_cfg)
def _gzip_data(data):
with io.BytesIO() as iobuf:
gzfp = gzip.GzipFile(mode="wb", fileobj=iobuf)
gzfp.write(data)
gzfp.close()
return iobuf.getvalue()
|