From 1d9d6c24328b699e1393e675874fca1c168d5f31 Mon Sep 17 00:00:00 2001 From: Thomas Pedersen Date: Tue, 4 Feb 2020 23:13:48 -0800 Subject: [PATCH] tests: Factor out multicast connectivity check A test may want to check multicast connectivity independent of unicast or check multicast without exercising unicast first. Factor out the multicast connectivity check code into its own function. Signed-off-by: Thomas Pedersen --- tests/hwsim/hostapd.py | 3 + tests/hwsim/hwsim_utils.py | 173 ++++++++++++++++------------------- tests/hwsim/wpasupplicant.py | 9 ++ 3 files changed, 93 insertions(+), 92 deletions(-) diff --git a/tests/hwsim/hostapd.py b/tests/hwsim/hostapd.py index 67e8a7fb8..fac287e99 100644 --- a/tests/hwsim/hostapd.py +++ b/tests/hwsim/hostapd.py @@ -179,6 +179,9 @@ class Hostapd: self.bssid = self.get_status_field('bssid[%d]' % self.bssidx) return self.bssid + def get_addr(self, group=False): + return self.own_addr() + def request(self, cmd): logger.debug(self.dbg + ": CTRL: " + cmd) return self.ctrl.request(cmd) diff --git a/tests/hwsim/hwsim_utils.py b/tests/hwsim/hwsim_utils.py index dccdefc71..eb312bf96 100644 --- a/tests/hwsim/hwsim_utils.py +++ b/tests/hwsim/hwsim_utils.py @@ -11,21 +11,80 @@ logger = logging.getLogger() from wpasupplicant import WpaSupplicant +def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2): + cmd = "DATA_TEST_CONFIG 1" + if ifname1: + cmd = cmd + " ifname=" + ifname1 + if dev1group: + res = dev1.group_request(cmd) + else: + res = dev1.request(cmd) + if "OK" not in res: + raise Exception("Failed to enable data test functionality") + + cmd = "DATA_TEST_CONFIG 1" + if ifname2: + cmd = cmd + " ifname=" + ifname2 + if dev2group: + res = dev2.group_request(cmd) + else: + res = dev2.request(cmd) + if "OK" not in res: + raise Exception("Failed to enable data test functionality") + +def run_multicast_connectivity_test(dev1, dev2, tos=None, + dev1group=False, dev2group=False, + ifname1=None, ifname2=None, + config=True, timeout=5, + send_len=None, multicast_to_unicast=False, + broadcast_retry_c=1): + addr1 = dev1.get_addr(dev1group) + addr2 = dev2.get_addr(dev2group) + + if config: + config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) + + cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) + if send_len is not None: + cmd += " len=" + str(send_len) + for i in range(broadcast_retry_c): + try: + if dev1group: + dev1.group_request(cmd) + else: + dev1.request(cmd) + if dev2group: + ev = dev2.wait_group_event(["DATA-TEST-RX"], + timeout=timeout) + else: + ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) + if ev is None: + raise Exception("dev1->dev2 broadcast data delivery failed") + if multicast_to_unicast: + if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev: + raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing") + if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: + raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)") + else: + if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: + raise Exception("Unexpected dev1->dev2 broadcast data result") + if send_len is not None: + if " len=" + str(send_len) not in ev: + raise Exception("Unexpected dev1->dev2 broadcast data length") + else: + if " len=" in ev: + raise Exception("Unexpected dev1->dev2 broadcast data length") + break + except Exception as e: + if i == broadcast_retry_c - 1: + raise + def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, ifname1=None, ifname2=None, config=True, timeout=5, multicast_to_unicast=False, broadcast=True, send_len=None): - addr1 = dev1.own_addr() - if not dev1group and isinstance(dev1, WpaSupplicant): - addr = dev1.get_driver_status_field('addr') - if addr: - addr1 = addr - - addr2 = dev2.own_addr() - if not dev2group and isinstance(dev2, WpaSupplicant): - addr = dev2.get_driver_status_field('addr') - if addr: - addr2 = addr + addr1 = dev1.get_addr(dev1group) + addr2 = dev2.get_addr(dev2group) dev1.dump_monitor() dev2.dump_monitor() @@ -37,25 +96,7 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, try: if config: - cmd = "DATA_TEST_CONFIG 1" - if ifname1: - cmd = cmd + " ifname=" + ifname1 - if dev1group: - res = dev1.group_request(cmd) - else: - res = dev1.request(cmd) - if "OK" not in res: - raise Exception("Failed to enable data test functionality") - - cmd = "DATA_TEST_CONFIG 1" - if ifname2: - cmd = cmd + " ifname=" + ifname2 - if dev2group: - res = dev2.group_request(cmd) - else: - res = dev2.request(cmd) - if "OK" not in res: - raise Exception("Failed to enable data test functionality") + config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos) if send_len is not None: @@ -80,34 +121,10 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, raise Exception("Unexpected dev1->dev2 unicast data length") if broadcast: - cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) - if send_len is not None: - cmd += " len=" + str(send_len) - for i in range(broadcast_retry_c): - try: - if dev1group: - dev1.group_request(cmd) - else: - dev1.request(cmd) - if dev2group: - ev = dev2.wait_group_event(["DATA-TEST-RX"], - timeout=timeout) - else: - ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) - if ev is None: - raise Exception("dev1->dev2 broadcast data delivery failed") - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: - raise Exception("Unexpected dev1->dev2 broadcast data result") - if send_len is not None: - if " len=" + str(send_len) not in ev: - raise Exception("Unexpected dev1->dev2 broadcast data length") - else: - if " len=" in ev: - raise Exception("Unexpected dev1->dev2 broadcast data length") - break - except Exception as e: - if i == broadcast_retry_c - 1: - raise + run_multicast_connectivity_test(dev1, dev2, tos, + dev1group, dev2group, + ifname1, ifname2, False, timeout, + send_len, False, broadcast_retry_c) cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos) if send_len is not None: @@ -132,40 +149,12 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, raise Exception("Unexpected dev2->dev1 unicast data length") if broadcast: - cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos) - if send_len is not None: - cmd += " len=" + str(send_len) - for i in range(broadcast_retry_c): - try: - if dev2group: - dev2.group_request(cmd) - else: - dev2.request(cmd) - if dev1group: - ev = dev1.wait_group_event(["DATA-TEST-RX"], - timeout=timeout) - else: - ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) - if ev is None: - raise Exception("dev2->dev1 broadcast data delivery failed") - if multicast_to_unicast: - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing") - if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)") - else: - if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev: - raise Exception("Unexpected dev2->dev1 broadcast data result") - if send_len is not None: - if " len=" + str(send_len) not in ev: - raise Exception("Unexpected dev2->dev1 broadcast data length") - else: - if " len=" in ev: - raise Exception("Unexpected dev2->dev1 broadcast data length") - break - except Exception as e: - if i == broadcast_retry_c - 1: - raise + run_multicast_connectivity_test(dev2, dev1, tos, + dev2group, dev1group, + ifname2, ifname1, False, timeout, + send_len, multicast_to_unicast, + broadcast_retry_c) + finally: if config: if dev1group: diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 1287df80a..2283fa99c 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -598,6 +598,15 @@ class WpaSupplicant: res = self.p2p_dev_addr() return res + def get_addr(self, group=False): + dev_addr = self.own_addr() + if not group: + addr = self.get_status_field('address') + if addr: + dev_addr = addr + + return dev_addr + def p2p_listen(self): return self.global_request("P2P_LISTEN")