summaryrefslogtreecommitdiff
path: root/tests/common
diff options
context:
space:
mode:
authorSergey V. Lobanov <sergey@lobanov.in>2022-09-04 18:49:42 +0300
committerSergey V. Lobanov <sergey@lobanov.in>2022-09-04 19:06:49 +0300
commitc92ff6266b18a9655edef231391739f0479dfb3a (patch)
tree0196077cbd54c9607ba918a42cac567411197663 /tests/common
parent38d96b8e20608fb743d543fe3f08ad4b9d1dcd66 (diff)
downloadaccel-ppp-c92ff6266b18a9655edef231391739f0479dfb3a.tar.gz
accel-ppp-c92ff6266b18a9655edef231391739f0479dfb3a.zip
add tests and ci workflow for running tests
This commit adds tests (using python3 pytest framework): 1. Test basic accel-cmd commands (show version, show stat, etc) 2. Test ipoe shared session up (dhcpv4) without radius 3. Test pppoe discovery (without PADO delay) 4. Test pppoe discovery (without PADO delay) 5. Test pppoe session up (ipv4) without radius 6. Test vlan creation using vlan-mon (pppoe) These tests require external utils. Please read tests/README.md how to setup environment, how to run the tests and how to generate coverage report Also, run-tests.yml contains step-by-step instruction how to run the tests Signed-off-by: Sergey V. Lobanov <sergey@lobanov.in>
Diffstat (limited to 'tests/common')
-rw-r--r--tests/common/__init__.py0
-rw-r--r--tests/common/accel_pppd_process.py86
-rw-r--r--tests/common/config.py15
-rw-r--r--tests/common/dhclient_process.py45
-rw-r--r--tests/common/netns.py29
-rw-r--r--tests/common/pppd_process.py43
-rw-r--r--tests/common/process.py7
-rw-r--r--tests/common/veth.py84
-rw-r--r--tests/common/vlan.py13
9 files changed, 322 insertions, 0 deletions
diff --git a/tests/common/__init__.py b/tests/common/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/common/__init__.py
diff --git a/tests/common/accel_pppd_process.py b/tests/common/accel_pppd_process.py
new file mode 100644
index 00000000..c2ee451e
--- /dev/null
+++ b/tests/common/accel_pppd_process.py
@@ -0,0 +1,86 @@
+from subprocess import Popen, PIPE
+from common import process
+from threading import Thread
+import time
+
+
+def accel_pppd_thread_func(accel_pppd_control):
+ process = accel_pppd_control["process"]
+ print("accel_pppd_thread_func: before communicate")
+ (out, err) = process.communicate()
+ print(
+ "accel_pppd_thread_func: after communicate out=" + str(out) + " err=" + str(err)
+ )
+ process.wait()
+ print("accel_pppd_thread_func: after wait")
+
+
+def start(accel_pppd, args, accel_cmd, max_wait_time):
+ print("accel_pppd_start: begin")
+ accel_pppd_process = Popen([accel_pppd] + args, stdout=PIPE, stderr=PIPE)
+ accel_pppd_control = {"process": accel_pppd_process}
+ accel_pppd_thread = Thread(
+ target=accel_pppd_thread_func,
+ args=[accel_pppd_control],
+ )
+ accel_pppd_thread.start()
+
+ # wait until accel-pppd replies to 'show version'
+ # accel-pppd needs some time to be accessible
+ sleep_time = 0.0
+ is_started = False
+ while sleep_time < max_wait_time:
+ if accel_pppd_process.poll() is not None: # process is terminated
+ print(
+ "accel_pppd_start: terminated during 'show version' polling in (sec): "
+ + str(sleep_time)
+ )
+ is_started = False
+ break
+ (exit, out, err) = process.run([accel_cmd, "show version"])
+ if exit != 0: # does not reply
+ time.sleep(0.1)
+ sleep_time += 0.1
+ else: # replied
+ print("accel_pppd_start: 'show version' replied")
+ is_started = True
+ break
+
+ return (is_started, accel_pppd_thread, accel_pppd_control)
+
+
+def end(accel_pppd_thread, accel_pppd_control, accel_cmd, max_wait_time):
+ print("accel_pppd_end: begin")
+ if accel_pppd_control["process"].poll() is not None: # terminated
+ print("accel_pppd_end: already terminated. nothing to do")
+ accel_pppd_thread.join()
+ return
+
+ process.run(
+ [accel_cmd, "shutdown hard"]
+ ) # send shutdown hard command (in coverage mode it helps saving coverage data)
+ print("accel_pppd_end: after shutdown hard")
+
+ # wait until accel-pppd is finished
+ sleep_time = 0.0
+ is_finished = False
+ while sleep_time < max_wait_time:
+ if accel_pppd_control["process"].poll() is None: # not terminated yet
+ time.sleep(0.01)
+ sleep_time += 0.01
+ # print("accel_pppd_end: sleep 0.01")
+ else:
+ is_finished = True
+ print(
+ "accel_pppd_end: finished via shutdown hard in (sec): "
+ + str(sleep_time)
+ )
+ break
+
+ # accel-pppd is still alive. kill it
+ if not is_finished:
+ print("accel_pppd_end: kill process: " + str(accel_pppd_control["process"]))
+ accel_pppd_control["process"].kill() # kill -9 if 'shutdown hard' didn't help
+
+ accel_pppd_thread.join() # wait until thread is finished
+ print("accel_pppd_end: end")
diff --git a/tests/common/config.py b/tests/common/config.py
new file mode 100644
index 00000000..94ddcd8c
--- /dev/null
+++ b/tests/common/config.py
@@ -0,0 +1,15 @@
+import tempfile
+import os
+
+
+def make_tmp(content):
+ f = tempfile.NamedTemporaryFile(delete=False)
+ print("make_tmp filename: " + f.name)
+ f.write(bytes(content, "utf-8"))
+ f.close()
+ return f.name
+
+
+def delete_tmp(filename):
+ print("delete_tmp filename: " + filename)
+ os.unlink(filename)
diff --git a/tests/common/dhclient_process.py b/tests/common/dhclient_process.py
new file mode 100644
index 00000000..aaea4860
--- /dev/null
+++ b/tests/common/dhclient_process.py
@@ -0,0 +1,45 @@
+from subprocess import Popen, PIPE
+from threading import Thread
+
+
+def dhclient_thread_func(dhclient_control):
+ process = dhclient_control["process"]
+ print("dhclient_thread_func: before communicate")
+ (out, err) = process.communicate()
+ print(
+ "dhclient_thread_func: after communicate out=" + str(out) + " err=" + str(err)
+ )
+ process.wait()
+ print("dhclient_thread_func: after wait")
+
+
+def start(netns, dhclient, args):
+ print("dhclient_start: begin")
+ print("dhclient_start: args=" + str(args))
+ dhclient_process = Popen(
+ ["ip", "netns", "exec", netns] + [dhclient] + args, stdout=PIPE, stderr=PIPE
+ )
+ print("dhclient_start: dhclient_process=" + str(dhclient_process))
+ dhclient_control = {"process": dhclient_process}
+ dhclient_thread = Thread(
+ target=dhclient_thread_func,
+ args=[dhclient_control],
+ )
+ dhclient_thread.start()
+
+ is_started = True if dhclient_process.poll() is None else False
+
+ return (is_started, dhclient_thread, dhclient_control)
+
+
+def end(dhclient_thread, dhclient_control):
+ print("dhclient_end: begin")
+ if dhclient_control["process"].poll() is not None: # already terminated
+ print("dhclient_end: already terminated. nothing to do")
+ dhclient_thread.join()
+ return
+
+ print("dhclient_end: kill process: " + str(dhclient_control["process"]))
+ dhclient_control["process"].kill() # kill -9
+ dhclient_thread.join() # wait until thread is finished
+ print("dhclient_end: end")
diff --git a/tests/common/netns.py b/tests/common/netns.py
new file mode 100644
index 00000000..9b0d729b
--- /dev/null
+++ b/tests/common/netns.py
@@ -0,0 +1,29 @@
+from common import process
+
+# creates netns and returns netns name. if ok return 0
+def create(netns_name):
+ netns, out, err = process.run(["ip", "netns", "add", netns_name])
+ print("netns.create: exit=%d out=%s err=%s" % (netns, out, err))
+
+ return netns
+
+
+# deletes netns. if ok return 0
+def delete(netns_name):
+ netns, out, err = process.run(["ip", "netns", "delete", netns_name])
+ print("netns.delete: exit=%d out=%s err=%s" % (netns, out, err))
+
+ return netns
+
+
+# execute command in netns using process.run
+# if netns_name is None, then execute in global rt
+def exec(netns_name, command):
+ if netns_name is None:
+ exit, out, err = process.run(command)
+ else:
+ exit, out, err = process.run(["ip", "netns", "exec", netns_name] + command)
+
+ print("netns.exec: netns=%s command=%s :: exit=%d out=%s err=%s" % (netns_name, str(command), exit, out, err))
+
+ return (exit, out, err)
diff --git a/tests/common/pppd_process.py b/tests/common/pppd_process.py
new file mode 100644
index 00000000..45844400
--- /dev/null
+++ b/tests/common/pppd_process.py
@@ -0,0 +1,43 @@
+from subprocess import Popen, PIPE
+from threading import Thread
+
+
+def pppd_thread_func(pppd_control):
+ process = pppd_control["process"]
+ print("pppd_thread_func: before communicate")
+ (out, err) = process.communicate()
+ print("pppd_thread_func: after communicate out=" + str(out) + " err=" + str(err))
+ process.wait()
+ print("pppd_thread_func: after wait")
+
+
+def start(netns, pppd, args):
+ print("pppd_start: begin")
+ print("pppd_start: args=" + str(args))
+ pppd_process = Popen(
+ ["ip", "netns", "exec", netns] + [pppd] + args, stdout=PIPE, stderr=PIPE
+ )
+ print("pppd_start: pppd_process=" + str(pppd_process))
+ pppd_control = {"process": pppd_process}
+ pppd_thread = Thread(
+ target=pppd_thread_func,
+ args=[pppd_control],
+ )
+ pppd_thread.start()
+
+ is_started = True if pppd_process.poll() is None else False
+
+ return (is_started, pppd_thread, pppd_control)
+
+
+def end(pppd_thread, pppd_control):
+ print("pppd_end: begin")
+ if pppd_control["process"].poll() is not None: # already terminated
+ print("pppd_end: already terminated. nothing to do")
+ pppd_thread.join()
+ return
+
+ print("pppd_end: kill process: " + str(pppd_control["process"]))
+ pppd_control["process"].kill() # kill -9
+ pppd_thread.join() # wait until thread is finished
+ print("pppd_end: end")
diff --git a/tests/common/process.py b/tests/common/process.py
new file mode 100644
index 00000000..e0c61363
--- /dev/null
+++ b/tests/common/process.py
@@ -0,0 +1,7 @@
+from subprocess import Popen, PIPE
+
+def run(command):
+ process = Popen(command, stdout=PIPE, stderr=PIPE)
+ (out, err) = process.communicate()
+ exit_code = process.wait()
+ return (exit_code, out.decode("utf-8"), err.decode("utf-8")) \ No newline at end of file
diff --git a/tests/common/veth.py b/tests/common/veth.py
new file mode 100644
index 00000000..a31d2453
--- /dev/null
+++ b/tests/common/veth.py
@@ -0,0 +1,84 @@
+from common import process, netns, vlan
+import time
+import math
+
+# creates veth pair. if ok returns 0
+def create_pair(name_a, name_b):
+ veth, out, err = process.run(
+ ["ip", "link", "add", name_a, "type", "veth", "peer", "name", name_b]
+ )
+ print("veth.create: exit=%d out=%s err=%s" % (veth, out, err))
+
+ return veth
+
+
+# deletes veth pair. if ok returns 0
+def delete_veth(name_a):
+ veth, out, err = process.run(["ip", "link", "delete", name_a])
+ print("veth.delete: exit=%d out=%s err=%s" % (veth, out, err))
+
+ return veth
+
+
+# put veth to netns. if ok returns 0
+def assign_netns(veth, netns):
+ veth, out, err = process.run(["ip", "link", "set", veth, "netns", netns])
+ print("veth.assign_netns: exit=%d out=%s err=%s" % (veth, out, err))
+
+ return veth
+
+
+# up interface. if netns is None, then up in global rt. if ok returns 0
+def up_interface(iface, netns_name):
+ command = ["ip", "link", "set", iface, "up"]
+ exit, out, err = netns.exec(netns_name, command)
+ print(
+ "veth.up_interface: iface=%s netns=%s exit=%d out=%s err=%s"
+ % (iface, str(netns_name), exit, out, err)
+ )
+
+ return exit
+
+
+# creates netns, creates veth pair and place second link to netns
+# creates vlans over veth interfaces according to veth_pair_vlans_config
+# return dict with 'netns', 'veth_a', 'veth_b'
+def create_veth_pair_netns(veth_pair_vlans_config):
+
+ name = str(math.floor(time.time() * 1000) % 1000000)
+ netns_name = "N" + name
+ netns_status = netns.create(netns_name)
+ print("create_veth_pair_netns: netns_status=%d" % netns_status)
+
+ veth_a = "A" + name
+ veth_b = "B" + name
+ pair_status = create_pair(veth_a, veth_b)
+ print("create_veth_pair_netns: pair_status=%d" % pair_status)
+
+ up_interface(veth_a, None)
+
+ assign_status = assign_netns(veth_b, netns_name)
+ print("create_veth_pair_netns: assign_status=%d" % assign_status)
+
+ up_interface(veth_b, netns_name)
+
+ vlans_a = veth_pair_vlans_config["vlans_a"]
+ for vlan_num in vlans_a:
+ vlan.create(veth_a, vlan_num, None)
+ up_interface(veth_a + "." + str(vlan_num), None)
+
+ vlans_b = veth_pair_vlans_config["vlans_b"]
+ for vlan_num in vlans_b:
+ vlan.create(veth_b, vlan_num, netns_name)
+ up_interface(veth_b + "." + str(vlan_num), netns_name)
+
+ return {"netns": netns_name, "veth_a": veth_a, "veth_b": veth_b}
+
+
+# deletes veth pair and netns created by create_veth_pair_netns
+def delete_veth_pair_netns(veth_pair_netns):
+ veth_status = delete_veth(veth_pair_netns["veth_a"])
+ print("delete_veth_pair_netns: veth_status=%d" % veth_status)
+
+ netns_status = netns.delete(veth_pair_netns["netns"])
+ print("delete_veth_pair_netns: netns_status=%d" % netns_status)
diff --git a/tests/common/vlan.py b/tests/common/vlan.py
new file mode 100644
index 00000000..459efb18
--- /dev/null
+++ b/tests/common/vlan.py
@@ -0,0 +1,13 @@
+from common import netns
+
+# up interface. if netns is None, then up in global rt. if ok returns 0
+def create(parent_if, vlan, netns_name):
+ command = (
+ "ip link add link %s name %s.%d type vlan id %d"
+ % (parent_if, parent_if, vlan, vlan)
+ ).split()
+
+ vlan, out, err = netns.exec(netns_name, command)
+ print("vlan.create: exit=%d out=%s err=%s" % (vlan, out, err))
+
+ return vlan