From f5592e2d5e3052ee81aba08c1128db5dd36b10a1 Mon Sep 17 00:00:00 2001 From: Andrei Otcheretianski Date: Mon, 22 May 2023 22:34:12 +0300 Subject: [PATCH] tests: Add basic MLD hwsim tests Signed-off-by: Andrei Otcheretianski --- tests/hwsim/example-hostapd.config | 1 + tests/hwsim/hostapd.py | 82 +++++++- tests/hwsim/hwsim.py | 11 +- tests/hwsim/test_eht.py | 292 +++++++++++++++++++++++++++++ 4 files changed, 379 insertions(+), 7 deletions(-) diff --git a/tests/hwsim/example-hostapd.config b/tests/hwsim/example-hostapd.config index 5b7130fdc..e6f91fe38 100644 --- a/tests/hwsim/example-hostapd.config +++ b/tests/hwsim/example-hostapd.config @@ -117,3 +117,4 @@ CONFIG_DPP2=y CONFIG_WEP=y CONFIG_PASN=y CONFIG_AIRTIME_POLICY=y +CONFIG_IEEE80211BE=y diff --git a/tests/hwsim/hostapd.py b/tests/hwsim/hostapd.py index 77b210b6e..b93d5caed 100644 --- a/tests/hwsim/hostapd.py +++ b/tests/hwsim/hostapd.py @@ -101,6 +101,11 @@ class HostapdGlobal: if not ignore_error: raise Exception("Could not add hostapd BSS") + def add_link(self, ifname, confname): + res = self.request("ADD " + ifname + " config=" + confname) + if "OK" not in res: + raise Exception("Could not add hostapd link") + def remove(self, ifname): self.request("REMOVE " + ifname, timeout=30) @@ -141,13 +146,14 @@ class HostapdGlobal: self.host.send_file(src, dst) class Hostapd: - def __init__(self, ifname, bssidx=0, hostname=None, port=8877): + def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl, + port=8877): self.hostname = hostname self.host = remotehost.Host(hostname, ifname) self.ifname = ifname if hostname is None: - self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) - self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) + self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname)) + self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname)) self.dbg = ifname else: self.ctrl = wpaspy.Ctrl(hostname, port) @@ -156,6 +162,7 @@ class Hostapd: self.mon.attach() self.bssid = None self.bssidx = bssidx + self.mld_addr = None def cmd_execute(self, cmd_array, shell=False): if self.hostname is None: @@ -184,8 +191,15 @@ class Hostapd: self.bssid = self.get_status_field('bssid[%d]' % self.bssidx) return self.bssid + def own_mld_addr(self): + if self.mld_addr is None: + self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx) + return self.mld_addr + def get_addr(self, group=False): - return self.own_addr() + if self.own_mld_addr() is None: + return self.own_addr() + return self.own_mld_addr() def request(self, cmd): logger.debug(self.dbg + ": CTRL: " + cmd) @@ -682,6 +696,37 @@ def add_iface(apdev, confname): raise Exception("Could not ping hostapd") return hapd +def add_mld_link(apdev, params): + if isinstance(apdev, dict): + ifname = apdev['ifname'] + try: + hostname = apdev['hostname'] + port = apdev['port'] + logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname) + except: + logger.info("Adding link on: ifname=" + ifname) + hostname = None + port = 8878 + else: + ifname = apdev + logger.info("Adding link on: ifname=" + ifname) + hostname = None + port = 8878 + + hapd_global = HostapdGlobal(apdev) + confname, ctrl_iface = cfg_mld_link_file(ifname, params) + hapd_global.send_file(confname, confname) + try: + hapd_global.add_link(ifname, confname) + except Exception as e: + if str(e) == "Could not add hostapd link": + raise utils.HwsimSkip("No MLO support in hostapd") + port = hapd_global.get_ctrl_iface_port(ifname) + hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port) + if not hapd.ping(): + raise Exception("Could not ping hostapd") + return hapd + def remove_bss(apdev, ifname=None): if ifname == None: ifname = apdev['ifname'] @@ -904,3 +949,32 @@ def cfg_file(apdev, conf, ifname=None): return fname return conf + +idx = 0 +def cfg_mld_link_file(ifname, params): + global idx + ctrl_iface="/var/run/hostapd" + conf = "link-%d.conf" % idx + + fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-') + f = os.fdopen(fd, 'w') + + if idx != 0: + ctrl_iface="/var/run/hostapd_%d" % idx + + f.write("ctrl_interface=%s\n" % ctrl_iface) + f.write("driver=nl80211\n") + f.write("ieee80211n=1\n") + f.write("ieee80211ac=1\n") + f.write("ieee80211ax=1\n") + f.write("ieee80211be=1\n") + f.write("interface=%s\n" % ifname) + f.write("mld_ap=1\n") + f.write("mld_id=0\n") + + for k, v in list(params.items()): + f.write("{}={}\n".format(k,v)) + + idx = idx + 1 + + return fname, ctrl_iface diff --git a/tests/hwsim/hwsim.py b/tests/hwsim/hwsim.py index bc8aabdd4..5b1f858c9 100644 --- a/tests/hwsim/hwsim.py +++ b/tests/hwsim/hwsim.py @@ -17,6 +17,7 @@ HWSIM_ATTR_CHANNELS = 9 HWSIM_ATTR_RADIO_ID = 10 HWSIM_ATTR_SUPPORT_P2P_DEVICE = 14 HWSIM_ATTR_USE_CHANCTX = 15 +HWSIM_ATTR_MLO_SUPPORT = 25 # the controller class class HWSimController(object): @@ -25,7 +26,7 @@ class HWSimController(object): self._fid = netlink.genl_controller.get_family_id(b'MAC80211_HWSIM') def create_radio(self, n_channels=None, use_chanctx=False, - use_p2p_device=False): + use_p2p_device=False, use_mlo=False): attrs = [] if n_channels: attrs.append(netlink.U32Attr(HWSIM_ATTR_CHANNELS, n_channels)) @@ -33,6 +34,8 @@ class HWSimController(object): attrs.append(netlink.FlagAttr(HWSIM_ATTR_USE_CHANCTX)) if use_p2p_device: attrs.append(netlink.FlagAttr(HWSIM_ATTR_SUPPORT_P2P_DEVICE)) + if use_mlo: + attrs.append(netlink.FlagAttr(HWSIM_ATTR_MLO_SUPPORT)) msg = netlink.GenlMessage(self._fid, HWSIM_CMD_CREATE_RADIO, flags=netlink.NLM_F_REQUEST | @@ -50,17 +53,19 @@ class HWSimController(object): class HWSimRadio(object): def __init__(self, n_channels=None, use_chanctx=False, - use_p2p_device=False): + use_p2p_device=False, use_mlo=False): self._controller = HWSimController() self._n_channels = n_channels self._use_chanctx = use_chanctx self._use_p2p_dev = use_p2p_device + self._use_mlo = use_mlo def __enter__(self): self._radio_id = self._controller.create_radio( n_channels=self._n_channels, use_chanctx=self._use_chanctx, - use_p2p_device=self._use_p2p_dev) + use_p2p_device=self._use_p2p_dev, + use_mlo=self._use_mlo) if self._radio_id < 0: raise Exception("Failed to create radio (err:%d)" % self._radio_id) try: diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py index ebc846c0d..955f73589 100644 --- a/tests/hwsim/test_eht.py +++ b/tests/hwsim/test_eht.py @@ -6,6 +6,59 @@ import hostapd from utils import * +from hwsim import HWSimRadio +import hwsim_utils +from wpasupplicant import WpaSupplicant +import re + +def eht_verify_wifi_version(dev): + status = dev.get_status() + logger.info("station status: " + str(status)) + + if 'wifi_generation' not in status: + raise Exception("Missing wifi_generation information") + if status['wifi_generation'] != "7": + raise Exception("Unexpected wifi_generation value: " + status['wifi_generation']) + +def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False, + mld=False): + status = hapd.get_status() + + logger.info("hostapd STATUS: " + str(status)) + if is_ht and status["ieee80211n"] != "1": + raise Exception("Unexpected STATUS ieee80211n value") + if is_vht and status["ieee80211ac"] != "1": + raise Exception("Unexpected STATUS ieee80211ac value") + if status["ieee80211ax"] != "1": + raise Exception("Unexpected STATUS ieee80211ax value") + if status["ieee80211be"] != "1": + raise Exception("Unexpected STATUS ieee80211be value") + + sta = hapd.get_sta(wpas.own_addr()) + logger.info("hostapd STA: " + str(sta)) + if is_ht and "[HT]" not in sta['flags']: + raise Exception("Missing STA flag: HT") + if is_vht and "[VHT]" not in sta['flags']: + raise Exception("Missing STA flag: VHT") + if "[HE]" not in sta['flags']: + raise Exception("Missing STA flag: HE") + if "[EHT]" not in sta['flags']: + raise Exception("Missing STA flag: EHT") + + sig = wpas.request("SIGNAL_POLL").splitlines() + + # TODO: With MLD connection, signal poll logic is still not implemented. + # While mac80211 maintains the station using the MLD address, the + # information is maintained in the link stations, but it is not sent to + # user space yet. + if not mld: + if "FREQUENCY=%s" % freq not in sig: + raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig)) + if "WIDTH=%s MHz" % bw not in sig: + raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig)) + +def traffic_test(wpas, hapd): + hwsim_utils.test_connectivity(wpas, hapd) def test_eht_open(dev, apdev): """EHT AP with open mode configuration""" @@ -118,3 +171,242 @@ def test_eht_sae_mlo(dev, apdev): finally: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "0") + +def eht_mld_enable_ap(iface, params): + hapd = hostapd.add_mld_link(iface, params) + hapd.enable() + + ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1) + if ev is None: + raise Exception("AP startup timed out") + if "AP-ENABLED" not in ev: + raise Exception("AP startup failed") + + return hapd + +def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256", + mfp="2", pwe=None, beacon_prot="1"): + params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase, + wpa_key_mgmt=key_mgmt, ieee80211w=mfp) + params['ieee80211n'] = '1' + params['ieee80211ax'] = '1' + params['ieee80211be'] = '1' + params['channel'] = '1' + params['hw_mode'] = 'g' + params['group_mgmt_cipher'] = "AES-128-CMAC" + params['beacon_prot'] = beacon_prot + + if pwe is not None: + params['sae_pwe'] = pwe + + return params + +def test_eht_mld_discovery(dev, apdev): + """EHT MLD AP discovery""" + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + ssid = "mld_ap" + link0_params = {"ssid": ssid, + "hw_mode": "g", + "channel": "1"} + link1_params = {"ssid": ssid, + "hw_mode": "g", + "channel": "2"} + + hapd0 = eht_mld_enable_ap(hapd_iface, link0_params) + hapd1 = eht_mld_enable_ap(hapd_iface, link1_params) + + res = wpas.request("SCAN freq=2412,2417") + if "FAIL" in res: + raise Exception("Failed to start scan") + + ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"]) + if ev is None: + raise Exception("Scan did not start") + + ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"]) + if ev is None: + raise Exception("Scan did not complete") + + logger.info("Scan done") + + rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=", + re.MULTILINE) + ml_pattern = re.compile(".*multi-link:.*, MLD ID=0x0", re.MULTILINE) + + bss = wpas.request("BSS " + hapd0.own_addr()) + logger.info("BSS 0: " + str(bss)) + + if rnr_pattern.search(bss) is None: + raise Exception("RNR element not found for first link") + + if ml_pattern.search(bss) is None: + raise Exception("ML element not found for first link") + + bss = wpas.request("BSS " + hapd1.own_addr()) + logger.info("BSS 1: " + str(bss)) + + if rnr_pattern.search(bss) is None: + raise Exception("RNR element not found for second link") + + if ml_pattern.search(bss) is None: + raise Exception("ML element not found for second link") + +def test_eht_mld_owe_two_links(dev, apdev): + """EHT MLD AP with MLD client OWE connection using two links""" + with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \ + HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + ssid = "mld_ap_owe_two_link" + params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2") + + hapd0 = eht_mld_enable_ap(hapd0_iface, params) + + params['channel'] = '6' + + hapd1 = eht_mld_enable_ap(hapd0_iface, params) + # Check legacy client connection + dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2") + wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE", + ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True) + eht_verify_wifi_version(wpas) + traffic_test(wpas, hapd0) + traffic_test(wpas, hapd1) + +def test_eht_mld_sae_single_link(dev, apdev): + """EHT MLD AP with MLD client SAE H2E connection using single link""" + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + passphrase = 'qwertyuiop' + ssid = "mld_ap_sae_single_link" + params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", + mfp="2", pwe='2') + + hapd0 = eht_mld_enable_ap(hapd_iface, params) + + wpas.set("sae_pwe", "1") + wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", + key_mgmt="SAE", ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True) + eht_verify_wifi_version(wpas) + traffic_test(wpas, hapd0) + +def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1"): + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + passphrase = 'qwertyuiop' + ssid = "mld_ap_sae_two_link" + params = eht_mld_ap_wpa2_params(ssid, passphrase, + key_mgmt="SAE", mfp="2", pwe='1', + beacon_prot=beacon_prot) + + hapd0 = eht_mld_enable_ap(hapd_iface, params) + + params['channel'] = '6' + + hapd1 = eht_mld_enable_ap(hapd_iface, params) + + wpas.set("sae_pwe", "1") + wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", + key_mgmt="SAE", ieee80211w="2", beacon_prot="1") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True) + eht_verify_wifi_version(wpas) + traffic_test(wpas, hapd0) + traffic_test(wpas, hapd1) + +def test_eht_mld_sae_two_links(dev, apdev): + """EHT MLD AP with MLD client SAE H2E connection using two links""" + run_eht_mld_sae_two_links(dev, apdev) + +def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev): + """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection""" + run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0") + +def test_eht_mld_sae_ext_one_link(dev, apdev): + """EHT MLD AP with MLD client SAE-EXT H2E connection using single link""" + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + passphrase = 'qwertyuiop' + ssid = "mld_ap_sae_ext_single_link" + params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY") + + hapd0 = eht_mld_enable_ap(hapd_iface, params) + + wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", + key_mgmt="SAE-EXT-KEY", ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True) + eht_verify_wifi_version(wpas) + traffic_test(wpas, hapd0) + +def test_eht_mld_sae_ext_two_links(dev, apdev): + """EHT MLD AP with MLD client SAE-EXT H2E connection using two links""" + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + passphrase = 'qwertyuiop' + ssid = "mld_ap_sae_two_link" + params = eht_mld_ap_wpa2_params(ssid, passphrase, + key_mgmt="SAE-EXT-KEY") + + hapd0 = eht_mld_enable_ap(hapd_iface, params) + + params['channel'] = '6' + + hapd1 = eht_mld_enable_ap(hapd_iface, params) + + wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", + key_mgmt="SAE-EXT-KEY", ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True) + eht_verify_wifi_version(wpas) + traffic_test(wpas, hapd0) + traffic_test(wpas, hapd1) + +def test_eht_mld_sae_legacy_client(dev, apdev): + """EHT MLD AP with legacy client SAE H2E connection""" + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface): + passphrase = 'qwertyuiop' + ssid = "mld_ap_sae_two_link" + params = eht_mld_ap_wpa2_params(ssid, passphrase, + key_mgmt="SAE", mfp="2", pwe='1') + + hapd0 = eht_mld_enable_ap(hapd_iface, params) + + params['channel'] = '6' + + hapd1 = eht_mld_enable_ap(hapd_iface, params) + + dev[0].set("sae_groups", "") + dev[0].set("sae_pwe", "1") + dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412", + key_mgmt="SAE", ieee80211w="2", beacon_prot="1") + + eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True) + traffic_test(dev[0], hapd0)