tests: Basic cohosted MLDs functionality testing

Add test cases to test basic cohosted MLDs functionality. Add helper
functions to create the configuration file, start hostapd instance.

Client connectivity test case will be added via a subsequent commit.

eht_mld_cohosted_discovery: 2 co-hosted MLDs without non-MLD RNR. Basic
bring up and beacon, MLD RNR, scan validation.

eht_mld_cohosted_discovery_with_rnr: Same like eht_mld_cohosted_discovery
but additionally non-MLD RNR (rnr=1) is also enabled. Validate the non-MLD
RNR as well.

Signed-off-by: Aditya Kumar Singh <quic_adisi@quicinc.com>
This commit is contained in:
Aditya Kumar Singh 2024-04-22 09:42:38 +05:30 committed by Jouni Malinen
parent 51b5b9512f
commit a1e585fb63

View file

@ -1,10 +1,12 @@
# EHT tests
# Copyright (c) 2022, Qualcomm Innovation Center, Inc.
# Copyright (c) 2022-2024, Qualcomm Innovation Center, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import binascii
import tempfile
import hostapd
from utils import *
from hwsim import HWSimRadio
@ -1839,3 +1841,240 @@ def test_eht_mlo_csa(dev, apdev):
traffic_test(wpas, hapd0)
#TODO: CSA on non-first link
def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g',
op_class=None):
# Create configuration file and add phy characteristics
fd, fname = tempfile.mkstemp(dir='/tmp',
prefix=prefix + iface + "-chan-" + str(channel) + "-")
f = os.fdopen(fd, 'w')
f.write("driver=nl80211\n")
f.write("hw_mode=" + str(hw_mode) + "\n")
f.write("ieee80211n=1\n")
if hw_mode == 'a' and \
(op_class is None or \
op_class not in [131, 132, 133, 134, 135, 136, 137]):
f.write("ieee80211ac=1\n")
f.write("ieee80211ax=1\n")
f.write("ieee80211be=1\n")
f.write("channel=" + str(channel) + "\n")
return f, fname
def append_bss_conf_to_file(f, ifname, params, first=False):
# Add BSS specific characteristics
config = "bss"
if first:
config = "interface"
f.write("\n" + config + "=%s\n" % ifname)
for k, v in list(params.items()):
f.write("{}={}\n".format(k,v))
f.write("mld_ap=1\n")
def dump_config(fname):
with open(fname, 'r') as f:
cfg = f.read()
logger.debug("hostapd config: " + str(fname) + "\n" + cfg)
def get_config(iface, count, ssid, passphrase, channel, bssid_regex,
rnr=False, debug=False):
f, fname = create_base_conf_file(iface, channel=channel)
hapds = []
for i in range(count):
if i == 0:
ifname = iface
else:
ifname = iface + "-" + str(i)
set_ssid = ssid + str(i)
set_passphrase = passphrase + str(i)
params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase,
wpa_key_mgmt="SAE", ieee80211w="2")
params['sae_pwe'] = "2"
params['group_mgmt_cipher'] = "AES-128-CMAC"
params['beacon_prot'] = "1"
params["ctrl_interface"] = "/var/run/hostapd/chan_" + str(channel)
params["bssid"] = bssid_regex % (i + 1)
if rnr:
params["rnr"] = "1"
append_bss_conf_to_file(f, ifname, params, first=(i == 0))
hapds.append([ifname, params["ctrl_interface"], i])
f.close()
if debug:
dump_config(fname)
return fname, hapds
def start_ap(prefix, configs):
pid = prefix + ".hostapd.pid"
configs = configs.split()
cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f',
prefix + ".hostapd-log"]
cmd = cmd + configs
logger.info("Starting APs")
res = subprocess.check_call(cmd)
if res != 0:
raise Exception("Could not start hostapd: %s" % str(res))
# Wait for hostapd to complete initialization and daemonize.
time.sleep(2)
for i in range(20):
if os.path.exists(pid):
break
time.sleep(0.2)
if not os.path.exists(pid):
raise Exception("hostapd did not create PID file.")
def get_mld_devs(hapd_iface, count, prefix, rnr=False):
fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-",
passphrase="qwertyuiop-", channel=1,
bssid_regex="02:00:00:00:07:%02x",
rnr=rnr, debug=True)
fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-",
passphrase="qwertyuiop-", channel=6,
bssid_regex="02:00:00:00:08:%02x",
rnr=rnr, debug=True)
start_ap(prefix, fname1 + " " + fname2)
hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], ctrl=hapds1[0][1],
bssidx=hapds1[0][2])
hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], ctrl=hapds2[0][1],
bssidx=hapds2[0][2])
hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], ctrl=hapds1[1][1],
bssidx=hapds1[1][2])
hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], ctrl=hapds2[1][1],
bssidx=hapds2[1][2])
if not hapd_mld1_link0.ping():
raise Exception("Could not ping hostapd")
if not hapd_mld1_link1.ping():
raise Exception("Could not ping hostapd")
if not hapd_mld2_link0.ping():
raise Exception("Could not ping hostapd")
if not hapd_mld2_link1.ping():
raise Exception("Could not ping hostapd")
os.remove(fname1)
os.remove(fname2)
return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1]
def stop_mld_devs(hapds, pid):
pid = pid + ".hostapd.pid"
if "OK" not in hapds[0].request("TERMINATE"):
raise Exception("Failed to terminate hostapd process")
ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
if ev is None:
raise Exception("CTRL-EVENT-TERMINATING not seen")
time.sleep(0.5)
for i in range(30):
time.sleep(0.1)
if not os.path.exists(pid):
break
if os.path.exists(pid):
raise Exception("PID file exits after process termination")
def eht_parse_rnr(bss, rnr=False, exp_bssid=None):
partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
re.MULTILINE)
ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
if partner_rnr_pattern.search(bss) is None:
raise Exception("RNR element not found for first link of first MLD")
if ml_pattern.search(bss) is None:
raise Exception("ML element not found for first link of first MLD")
if not rnr:
return
coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..",
re.MULTILINE)
if coloc_rnr_pattern.search(bss) is None:
raise Exception("RNR element not found for co-located BSS")
line = coloc_rnr_pattern.search(bss).group()
if line.count('bssid') > 1:
raise Exception("More than one BSS found for co-located RNR")
# Get the BSSID carried in the RNR
index = line.rindex('bssid')
bssid = line[index+len('bssid')+1:].split(',')[0]
# Get the MLD ID carried in the RNR
index = line.rindex('link ID')
link_id = line[index+len('link ID')+1:].split(',')[0]
if link_id != "15":
raise Exception("Unexpected link ID for co-located BSS which is not own partner")
if bssid != exp_bssid:
raise Exception("Unexpected BSSID for co-located BSS")
def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False):
with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
hapds = get_mld_devs(hapd_iface=hapd_iface, count=2,
prefix=params['prefix'], rnr=rnr)
# Only scan link 0
res = wpas.request("SCAN freq=2412")
if "FAIL" in res:
raise Exception("Failed to start scan")
ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
if ev is None:
raise Exception("Scan did not start")
ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
if ev is None:
raise Exception("Scan did not complete")
logger.info("Scan done")
bss = wpas.request("BSS " + hapds[0].own_addr())
logger.info("BSS 0_0: " + str(bss))
eht_parse_rnr(bss, rnr, hapds[2].own_addr())
bss = wpas.request("BSS " + hapds[2].own_addr())
logger.info("BSS 1_0: " + str(bss))
eht_parse_rnr(bss, rnr, hapds[0].own_addr())
stop_mld_devs(hapds, params['prefix'])
def test_eht_mld_cohosted_discovery(dev, apdev, params):
"""EHT 2 AP MLDs discovery"""
eht_mld_cohosted_discovery(dev, apdev, params)
def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params):
"""EHT 2 AP MLDs discovery (with co-location RNR)"""
eht_mld_cohosted_discovery(dev, apdev, params, rnr=True)