From a1e585fb637e0239661d82a080af08076690828e Mon Sep 17 00:00:00 2001 From: Aditya Kumar Singh Date: Mon, 22 Apr 2024 09:42:38 +0530 Subject: [PATCH] tests: Basic cohosted MLDs functionality testing Add test cases to test basic cohosted MLDs functionality. Add helper functions to create the configuration file, start hostapd instance. Client connectivity test case will be added via a subsequent commit. eht_mld_cohosted_discovery: 2 co-hosted MLDs without non-MLD RNR. Basic bring up and beacon, MLD RNR, scan validation. eht_mld_cohosted_discovery_with_rnr: Same like eht_mld_cohosted_discovery but additionally non-MLD RNR (rnr=1) is also enabled. Validate the non-MLD RNR as well. Signed-off-by: Aditya Kumar Singh --- tests/hwsim/test_eht.py | 241 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 240 insertions(+), 1 deletion(-) diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py index 296445e27..8c021402f 100644 --- a/tests/hwsim/test_eht.py +++ b/tests/hwsim/test_eht.py @@ -1,10 +1,12 @@ # EHT tests -# Copyright (c) 2022, Qualcomm Innovation Center, Inc. +# Copyright (c) 2022-2024, Qualcomm Innovation Center, Inc. # # This software may be distributed under the terms of the BSD license. # See README for more details. import binascii +import tempfile + import hostapd from utils import * from hwsim import HWSimRadio @@ -1839,3 +1841,240 @@ def test_eht_mlo_csa(dev, apdev): traffic_test(wpas, hapd0) #TODO: CSA on non-first link + +def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g', + op_class=None): + # Create configuration file and add phy characteristics + fd, fname = tempfile.mkstemp(dir='/tmp', + prefix=prefix + iface + "-chan-" + str(channel) + "-") + f = os.fdopen(fd, 'w') + + f.write("driver=nl80211\n") + f.write("hw_mode=" + str(hw_mode) + "\n") + f.write("ieee80211n=1\n") + if hw_mode == 'a' and \ + (op_class is None or \ + op_class not in [131, 132, 133, 134, 135, 136, 137]): + f.write("ieee80211ac=1\n") + f.write("ieee80211ax=1\n") + f.write("ieee80211be=1\n") + f.write("channel=" + str(channel) + "\n") + + return f, fname + +def append_bss_conf_to_file(f, ifname, params, first=False): + # Add BSS specific characteristics + config = "bss" + + if first: + config = "interface" + + f.write("\n" + config + "=%s\n" % ifname) + + for k, v in list(params.items()): + f.write("{}={}\n".format(k,v)) + + f.write("mld_ap=1\n") + +def dump_config(fname): + with open(fname, 'r') as f: + cfg = f.read() + logger.debug("hostapd config: " + str(fname) + "\n" + cfg) + +def get_config(iface, count, ssid, passphrase, channel, bssid_regex, + rnr=False, debug=False): + f, fname = create_base_conf_file(iface, channel=channel) + hapds = [] + + for i in range(count): + if i == 0: + ifname = iface + else: + ifname = iface + "-" + str(i) + + set_ssid = ssid + str(i) + set_passphrase = passphrase + str(i) + params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase, + wpa_key_mgmt="SAE", ieee80211w="2") + params['sae_pwe'] = "2" + params['group_mgmt_cipher'] = "AES-128-CMAC" + params['beacon_prot'] = "1" + params["ctrl_interface"] = "/var/run/hostapd/chan_" + str(channel) + params["bssid"] = bssid_regex % (i + 1) + + if rnr: + params["rnr"] = "1" + + append_bss_conf_to_file(f, ifname, params, first=(i == 0)) + + hapds.append([ifname, params["ctrl_interface"], i]) + + f.close() + + if debug: + dump_config(fname) + + return fname, hapds + +def start_ap(prefix, configs): + pid = prefix + ".hostapd.pid" + configs = configs.split() + + cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f', + prefix + ".hostapd-log"] + + cmd = cmd + configs + + logger.info("Starting APs") + res = subprocess.check_call(cmd) + if res != 0: + raise Exception("Could not start hostapd: %s" % str(res)) + + # Wait for hostapd to complete initialization and daemonize. + time.sleep(2) + for i in range(20): + if os.path.exists(pid): + break + time.sleep(0.2) + + if not os.path.exists(pid): + raise Exception("hostapd did not create PID file.") + +def get_mld_devs(hapd_iface, count, prefix, rnr=False): + fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-", + passphrase="qwertyuiop-", channel=1, + bssid_regex="02:00:00:00:07:%02x", + rnr=rnr, debug=True) + fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-", + passphrase="qwertyuiop-", channel=6, + bssid_regex="02:00:00:00:08:%02x", + rnr=rnr, debug=True) + + start_ap(prefix, fname1 + " " + fname2) + + hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], ctrl=hapds1[0][1], + bssidx=hapds1[0][2]) + hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], ctrl=hapds2[0][1], + bssidx=hapds2[0][2]) + + hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], ctrl=hapds1[1][1], + bssidx=hapds1[1][2]) + hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], ctrl=hapds2[1][1], + bssidx=hapds2[1][2]) + + if not hapd_mld1_link0.ping(): + raise Exception("Could not ping hostapd") + + if not hapd_mld1_link1.ping(): + raise Exception("Could not ping hostapd") + + if not hapd_mld2_link0.ping(): + raise Exception("Could not ping hostapd") + + if not hapd_mld2_link1.ping(): + raise Exception("Could not ping hostapd") + + os.remove(fname1) + os.remove(fname2) + + return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1] + +def stop_mld_devs(hapds, pid): + pid = pid + ".hostapd.pid" + + if "OK" not in hapds[0].request("TERMINATE"): + raise Exception("Failed to terminate hostapd process") + + ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15) + if ev is None: + raise Exception("CTRL-EVENT-TERMINATING not seen") + + time.sleep(0.5) + + for i in range(30): + time.sleep(0.1) + if not os.path.exists(pid): + break + if os.path.exists(pid): + raise Exception("PID file exits after process termination") + +def eht_parse_rnr(bss, rnr=False, exp_bssid=None): + partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=", + re.MULTILINE) + ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE) + + if partner_rnr_pattern.search(bss) is None: + raise Exception("RNR element not found for first link of first MLD") + + if ml_pattern.search(bss) is None: + raise Exception("ML element not found for first link of first MLD") + + if not rnr: + return + + coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..", + re.MULTILINE) + + if coloc_rnr_pattern.search(bss) is None: + raise Exception("RNR element not found for co-located BSS") + + line = coloc_rnr_pattern.search(bss).group() + if line.count('bssid') > 1: + raise Exception("More than one BSS found for co-located RNR") + + # Get the BSSID carried in the RNR + index = line.rindex('bssid') + bssid = line[index+len('bssid')+1:].split(',')[0] + + # Get the MLD ID carried in the RNR + index = line.rindex('link ID') + link_id = line[index+len('link ID')+1:].split(',')[0] + + if link_id != "15": + raise Exception("Unexpected link ID for co-located BSS which is not own partner") + + if bssid != exp_bssid: + raise Exception("Unexpected BSSID for co-located BSS") + +def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False): + with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + hapds = get_mld_devs(hapd_iface=hapd_iface, count=2, + prefix=params['prefix'], rnr=rnr) + + # Only scan link 0 + res = wpas.request("SCAN freq=2412") + if "FAIL" in res: + raise Exception("Failed to start scan") + + ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"]) + if ev is None: + raise Exception("Scan did not start") + + ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"]) + if ev is None: + raise Exception("Scan did not complete") + + logger.info("Scan done") + + bss = wpas.request("BSS " + hapds[0].own_addr()) + logger.info("BSS 0_0: " + str(bss)) + eht_parse_rnr(bss, rnr, hapds[2].own_addr()) + + bss = wpas.request("BSS " + hapds[2].own_addr()) + logger.info("BSS 1_0: " + str(bss)) + eht_parse_rnr(bss, rnr, hapds[0].own_addr()) + + stop_mld_devs(hapds, params['prefix']) + +def test_eht_mld_cohosted_discovery(dev, apdev, params): + """EHT 2 AP MLDs discovery""" + eht_mld_cohosted_discovery(dev, apdev, params) + +def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params): + """EHT 2 AP MLDs discovery (with co-location RNR)""" + eht_mld_cohosted_discovery(dev, apdev, params, rnr=True)