# EHT tests # Copyright (c) 2022, Qualcomm Innovation Center, Inc. # # This software may be distributed under the terms of the BSD license. # See README for more details. import hostapd from utils import * from hwsim import HWSimRadio import hwsim_utils from wpasupplicant import WpaSupplicant import re from tshark import run_tshark def eht_verify_wifi_version(dev): status = dev.get_status() logger.info("station status: " + str(status)) if 'wifi_generation' not in status: raise Exception("Missing wifi_generation information") if status['wifi_generation'] != "7": raise Exception("Unexpected wifi_generation value: " + status['wifi_generation']) def _eht_get_links_bitmap(wpas, name): vfile = "/sys/kernel/debug/ieee80211/%s/netdev:%s/%s" % \ (wpas.get_driver_status_field("phyname"), wpas.ifname, name) if wpas.cmd_execute(["ls", vfile])[0] != 0: logger_info("%s not supported in mac80211: %s" % (name, vfile)) return 0 res, out = wpas.cmd_execute(["cat", vfile], shell=True) if res != 0: raise Exception("Failed to read %s" % fname) logger.info("%s=%s" % (name, out)) return int(out, 16) def _eht_valid_links(wpas): return _eht_get_links_bitmap(wpas, "valid_links") def _eht_active_links(wpas): return _eht_get_links_bitmap(wpas, "active_links") def _eht_verify_links(wpas, valid_links=0, active_links=0): vlinks = _eht_valid_links(wpas) if vlinks != valid_links: raise Exception("Unexpected valid links (0x%04x != 0x%04x)" % (vlinks, valid_links)) alinks = _eht_active_links(wpas) if alinks != active_links: raise Exception("Unexpected active links (0x%04x != 0x%04x)" % (alinks, active_links)) def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False, mld=False, valid_links=0, active_links=0): status = hapd.get_status() logger.info("hostapd STATUS: " + str(status)) if is_ht and status["ieee80211n"] != "1": raise Exception("Unexpected STATUS ieee80211n value") if is_vht and status["ieee80211ac"] != "1": raise Exception("Unexpected STATUS ieee80211ac value") if status["ieee80211ax"] != "1": raise Exception("Unexpected STATUS ieee80211ax value") if status["ieee80211be"] != "1": raise Exception("Unexpected STATUS ieee80211be value") sta = hapd.get_sta(wpas.own_addr()) logger.info("hostapd STA: " + str(sta)) if is_ht and "[HT]" not in sta['flags']: raise Exception("Missing STA flag: HT") if is_vht and "[VHT]" not in sta['flags']: raise Exception("Missing STA flag: VHT") if "[HE]" not in sta['flags']: raise Exception("Missing STA flag: HE") if "[EHT]" not in sta['flags']: raise Exception("Missing STA flag: EHT") sig = wpas.request("SIGNAL_POLL").splitlines() # TODO: With MLD connection, signal poll logic is still not implemented. # While mac80211 maintains the station using the MLD address, the # information is maintained in the link stations, but it is not sent to # user space yet. if not mld: if "FREQUENCY=%s" % freq not in sig: raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig)) if "WIDTH=%s MHz" % bw not in sig: raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig)) # Active links are updated in async work after the connection. # Sleep a bit to allow it to run. time.sleep(0.1) _eht_verify_links(wpas, valid_links, active_links) def traffic_test(wpas, hapd, success=True): hwsim_utils.test_connectivity(wpas, hapd, success_expected=success) def test_eht_open(dev, apdev): """EHT AP with open mode configuration""" params = {"ssid": "eht", "ieee80211ax": "1", "ieee80211be": "1"} try: hapd = hostapd.add_ap(apdev[0], params) except Exception as e: if isinstance(e, Exception) and \ str(e) == "Failed to set hostapd parameter ieee80211be": raise HwsimSkip("EHT not supported") raise if hapd.get_status_field("ieee80211be") != "1": raise Exception("AP STATUS did not indicate ieee80211be=1") dev[0].connect("eht", key_mgmt="NONE", scan_freq="2412") sta = hapd.get_sta(dev[0].own_addr()) if "[EHT]" not in sta['flags']: raise Exception("Missing STA flag: EHT") status = dev[0].request("STATUS") if "wifi_generation=7" not in status: raise Exception("STA STATUS did not indicate wifi_generation=7") def test_prefer_eht_20(dev, apdev): params = {"ssid": "eht", "channel": "1", "ieee80211ax": "1", "ieee80211be" : "1", "ieee80211n": "1"} try: hapd0 = hostapd.add_ap(apdev[0], params) params["ieee80211be"] = "0" hapd1 = hostapd.add_ap(apdev[1], params) except Exception as e: if isinstance(e, Exception) and \ str(e) == "Failed to set hostapd parameter ieee80211be": raise HwsimSkip("EHT not supported") raise dev[0].connect("eht", key_mgmt="NONE") if dev[0].get_status_field('bssid') != apdev[0]['bssid']: raise Exception("dev[0] connected to unexpected AP") est = dev[0].get_bss(apdev[0]['bssid'])['est_throughput'] if est != "172103": raise Exception("Unexpected BSS1 est_throughput: " + est) def start_eht_sae_ap(apdev, ml=False, transition_mode=False): params = hostapd.wpa2_params(ssid="eht", passphrase="12345678") params["ieee80211ax"] = "1" params["ieee80211be"] = "1" params['ieee80211w'] = '1' if transition_mode else '2' params['rsn_pairwise'] = "CCMP GCMP-256" if transition_mode else "GCMP-256" params['group_cipher'] = "CCMP" if transition_mode else "GCMP-256" params["group_mgmt_cipher"] = "AES-128-CMAC" if transition_mode else "BIP-GMAC-256" params['beacon_prot'] = '1' params['wpa_key_mgmt'] = "SAE SAE-EXT-KEY" if transition_mode else 'SAE-EXT-KEY' params['sae_groups'] = "19 20" if transition_mode else "20" params['sae_pwe'] = "2" if transition_mode else "1" if ml: ml_elem = "ff0d6b" + "3001" + "0a" + "021122334455" + "01" + "00" + "00" params['vendor_elements'] = ml_elem try: hapd = hostapd.add_ap(apdev, params) except Exception as e: if isinstance(e, Exception) and \ str(e) == "Failed to set hostapd parameter ieee80211be": raise HwsimSkip("EHT not supported") raise def test_eht_sae(dev, apdev): """EHT AP with SAE""" check_sae_capab(dev[0]) hapd = start_eht_sae_ap(apdev[0]) try: dev[0].set("sae_groups", "20") dev[0].set("sae_pwe", "2") dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678", ieee80211w="2", beacon_prot="1", pairwise="GCMP-256", group="GCMP-256", group_mgmt="BIP-GMAC-256", scan_freq="2412") finally: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "0") def test_eht_sae_mlo(dev, apdev): """EHT+MLO AP with SAE""" check_sae_capab(dev[0]) hapd = start_eht_sae_ap(apdev[0], ml=True) try: dev[0].set("sae_groups", "20") dev[0].set("sae_pwe", "2") dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678", ieee80211w="2", beacon_prot="1", pairwise="GCMP-256", group="GCMP-256", group_mgmt="BIP-GMAC-256", scan_freq="2412") finally: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "0") def test_eht_sae_mlo_tm(dev, apdev): """EHT+MLO AP with SAE and transition mode""" check_sae_capab(dev[0]) check_sae_capab(dev[1]) hapd = start_eht_sae_ap(apdev[0], ml=True, transition_mode=True) try: dev[0].set("sae_groups", "20") dev[0].set("sae_pwe", "2") dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678", ieee80211w="2", beacon_prot="1", pairwise="GCMP-256", group="CCMP", group_mgmt="AES-128-CMAC", scan_freq="2412") dev[1].set("sae_groups", "19") dev[1].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678", ieee80211w="2", beacon_prot="1", pairwise="CCMP", group="CCMP", group_mgmt="AES-128-CMAC", scan_freq="2412", disable_eht="1") finally: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "0") dev[1].set("sae_groups", "") def eht_mld_enable_ap(iface, params): hapd = hostapd.add_mld_link(iface, params) hapd.enable() ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1) if ev is None: raise Exception("AP startup timed out") if "AP-ENABLED" not in ev: raise Exception("AP startup failed") return hapd def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256", mfp="2", pwe=None, beacon_prot="1"): params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase, wpa_key_mgmt=key_mgmt, ieee80211w=mfp) params['ieee80211n'] = '1' params['ieee80211ax'] = '1' params['ieee80211be'] = '1' params['channel'] = '1' params['hw_mode'] = 'g' params['group_mgmt_cipher'] = "AES-128-CMAC" params['beacon_prot'] = beacon_prot if pwe is not None: params['sae_pwe'] = pwe return params def _eht_mld_probe_req(wpas, hapd, tsf0, link_id=-1): if "OK" not in wpas.request("ML_PROBE_REQ bssid=%s mld_id=0 link_id=%d" % (hapd.own_addr(), link_id)): raise Exception("Failed to request ML probe request") ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"]) if ev is None: raise Exception("Scan did not start") ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"]) if ev is None: raise Exception("Scan did not complete") logger.info("ML Probe request scan done") bss = wpas.get_bss(hapd.own_addr()) if not bss: raise Exception("AP did not reply to ML probe request") tsf1 = int(bss['tsf']) logger.info("tsf0=%s, tsf1=%s" % (tsf0, tsf1)) if tsf0 >= tsf1: raise Exception("AP was not found in ML probe request scan") def test_eht_mld_discovery(dev, apdev): """EHT MLD AP discovery""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap" link0_params = {"ssid": ssid, "hw_mode": "g", "channel": "1"} link1_params = {"ssid": ssid, "hw_mode": "g", "channel": "2"} hapd0 = eht_mld_enable_ap(hapd_iface, link0_params) hapd1 = eht_mld_enable_ap(hapd_iface, link1_params) # Only scan link 0 res = wpas.request("SCAN freq=2412") if "FAIL" in res: raise Exception("Failed to start scan") ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"]) if ev is None: raise Exception("Scan did not start") ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"]) if ev is None: raise Exception("Scan did not complete") logger.info("Scan done") rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=", re.MULTILINE) ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE) bss = wpas.request("BSS " + hapd0.own_addr()) logger.info("BSS 0: " + str(bss)) if rnr_pattern.search(bss) is None: raise Exception("RNR element not found for first link") if ml_pattern.search(bss) is None: raise Exception("ML element not found for first link") # Save the tsf0 for checking ML Probe request scan later tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf']) if wpas.get_bss(hapd1.own_addr()) is not None: raise Exception("BSS for link 1 found without ML probe request") # Now send an ML probe request (for all links) _eht_mld_probe_req(wpas, hapd0, tsf0) tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf']) # NOTE: hostapd incorrectly reports a TSF offset of zero # This only works because the source is always the ML probe response tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf']) bss = wpas.request("BSS " + hapd1.own_addr()) logger.info("BSS 1: " + str(bss)) if rnr_pattern.search(bss) is None: raise Exception("RNR element not found for second link") if ml_pattern.search(bss) is None: raise Exception("ML element not found for second link") _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=1) if int(wpas.get_bss(hapd1.own_addr())['tsf']) <= tsf1: raise Exception("Probe for link ID did not update BSS") tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf']) tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf']) # Probing the wrong link ID should not update second link _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=4) if int(wpas.get_bss(hapd1.own_addr())['tsf']) != tsf1: raise Exception("Probe for other link ID not updated BSS") def test_eht_mld_owe_two_links(dev, apdev): """EHT MLD AP with MLD client OWE connection using two links""" with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \ HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap_owe_two_link" params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2") hapd0 = eht_mld_enable_ap(hapd0_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd0_iface, params) # Check legacy client connection dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2") wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_mld_sae_single_link(dev, apdev): """EHT MLD AP with MLD client SAE H2E connection using single link""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_single_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe='2') hapd0 = eht_mld_enable_ap(hapd_iface, params) wpas.set("sae_pwe", "1") wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1"): with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe='1', beacon_prot=beacon_prot) hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) wpas.set("sae_pwe", "1") wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", key_mgmt="SAE", ieee80211w="2", beacon_prot="1") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_mld_sae_two_links(dev, apdev): """EHT MLD AP with MLD client SAE H2E connection using two links""" run_eht_mld_sae_two_links(dev, apdev) def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev): """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection""" run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0") def test_eht_mld_sae_ext_one_link(dev, apdev): """EHT MLD AP with MLD client SAE-EXT H2E connection using single link""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_ext_single_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY") hapd0 = eht_mld_enable_ap(hapd_iface, params) wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE-EXT-KEY", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) def test_eht_mld_sae_ext_two_links(dev, apdev): """EHT MLD AP with MLD client SAE-EXT H2E connection using two links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY") hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", key_mgmt="SAE-EXT-KEY", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_mld_sae_legacy_client(dev, apdev): """EHT MLD AP with legacy client SAE H2E connection""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface): passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe='1') hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) try: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "1") dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE", ieee80211w="2", beacon_prot="1") eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True) traffic_test(dev[0], hapd0) finally: dev[0].set("sae_groups", "") dev[0].set("sae_pwe", "0") def test_eht_mld_sae_transition(dev, apdev): """EHT MLD AP in SAE/PSK transition mode with MLD client connection using two links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256", mfp="1") hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", key_mgmt="SAE-EXT-KEY", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) dev[0].set("sae_groups", "") dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE", ieee80211w="2", beacon_prot="1") dev[1].connect(ssid, psk=passphrase, scan_freq="2412", key_mgmt="WPA-PSK", ieee80211w="0") def test_eht_mld_ptk_rekey(dev, apdev): """EHT MLD AP and PTK rekeying with MLD client connection using two links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256", mfp="1") params['wpa_ptk_rekey'] = '5' hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", key_mgmt="SAE-EXT-KEY", ieee80211w="2") ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1) if ev0 is None: ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) ev = wpas.wait_event(["WPA: Key negotiation completed", "CTRL-EVENT-DISCONNECTED"], timeout=10) if ev is None: raise Exception("PTK rekey timed out") if "CTRL-EVENT-DISCONNECTED" in ev: raise Exception("Disconnect instead of rekey") time.sleep(0.1) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_mld_gtk_rekey(dev, apdev): """AP MLD and GTK rekeying with MLD client connection using two links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256", mfp="1") params['wpa_group_rekey'] = '5' hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437", key_mgmt="SAE-EXT-KEY", ieee80211w="2") ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1) if ev0 is None: ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) for i in range(2): ev = wpas.wait_event(["MLO RSN: Group rekeying completed", "CTRL-EVENT-DISCONNECTED"], timeout=10) if ev is None: raise Exception("GTK rekey timed out") if "CTRL-EVENT-DISCONNECTED" in ev: raise Exception("Disconnect instead of rekey") #TODO: Uncomment these ones GTK rekeying works for MLO #time.sleep(0.1) #traffic_test(wpas, hapd0) #traffic_test(wpas, hapd1) def test_eht_ml_probe_req(dev, apdev): """AP MLD with two links and non-AP MLD sending ML Probe Request""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) passphrase = 'qwertyuiop' ssid = "mld_ap_sae_two_link" params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY") hapd0 = eht_mld_enable_ap(hapd_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd_iface, params) bssid = hapd0.own_addr() wpas.scan_for_bss(bssid, freq=2412) time.sleep(1) cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0" if "OK" not in wpas.request(cmd): raise Exception("Failed to run: " + cmd) ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS", "CTRL-EVENT-SCAN-FAILED"], timeout=10) if ev is None: raise Exception("ML_PROBE_REQ did not result in scan results") time.sleep(1) cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0 link_id=2" if "OK" not in wpas.request(cmd): raise Exception("Failed to run: " + cmd) ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS", "CTRL-EVENT-SCAN-FAILED"], timeout=10) if ev is None: raise Exception("ML_PROBE_REQ did not result in scan results") def test_eht_mld_connect_probes(dev, apdev, params): """MLD client sends ML probe to connect to not discovered links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap" passphrase = 'qwertyuiop' link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2", key_mgmt="SAE", pwe='2') link_params['channel'] = '1' link_params['bssid'] = '00:11:22:33:44:01' hapd0 = eht_mld_enable_ap(hapd_iface, link_params) link_params['channel'] = '6' link_params['bssid'] = '00:11:22:33:44:02' hapd1 = eht_mld_enable_ap(hapd_iface, link_params) wpas.set("sae_pwe", "1") wpas.connect(ssid, sae_password= passphrase, ieee80211w="2", key_mgmt="SAE", scan_freq="2412") out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'), 'wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.number == 107 && wlan.ext_tag.data == 11:00:02:00:00:02:11:00', display=['frame.number']) if not out.splitlines(): raise Exception('ML probe request not found') # Probe Response frame has the ML element, which will be fragmented out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"), "wlan.fc.type_subtype == 0x0005 && wlan.ext_tag.number == 107 && wlan.ext_tag.length == 254", display=['frame.number']) if not out.splitlines(): # This requires new tshark (e.g., 4.0.6); for now, ignore the issue # to avoid forcing such upgrade. logger.info('ML probe response not found') #raise Exception('ML probe response not found') eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_tx_link_rejected_connect_other(dev, apdev, params): """EHT MLD AP with MLD client being rejected on TX link, but then connecting on second link""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap" passphrase = 'qwertyuiop' link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2", key_mgmt="SAE", pwe='2') link_params['channel'] = '1' link_params['bssid'] = '00:11:22:33:44:01' hapd0 = eht_mld_enable_ap(hapd_iface, link_params) link_params['channel'] = '6' link_params['bssid'] = '00:11:22:33:44:02' hapd1 = eht_mld_enable_ap(hapd_iface, link_params) wpas.set("sae_pwe", "1") with fail_test(hapd0, 1, "hostapd_get_aid"): wpas.connect(ssid, sae_password=passphrase, ieee80211w="2", key_mgmt="SAE", scan_freq="2412") eht_verify_status(wpas, hapd1, 2437, 20, is_ht=True, mld=True, valid_links=2, active_links=2) traffic_test(wpas, hapd0) traffic_test(wpas, hapd1) def test_eht_all_links_rejected(dev, apdev, params): """EHT MLD AP with MLD client ignores all rejected links""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap" passphrase = 'qwertyuiop' link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2", key_mgmt="SAE", pwe='2') link_params['channel'] = '1' link_params['bssid'] = '00:11:22:33:44:01' hapd0 = eht_mld_enable_ap(hapd_iface, link_params) link_params['channel'] = '6' link_params['bssid'] = '00:11:22:33:44:02' hapd1 = eht_mld_enable_ap(hapd_iface, link_params) wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01") wpas.set("sae_pwe", "1") with fail_test(hapd0, 1, "hostapd_get_aid", 1, "hostapd_process_assoc_ml_info"): wpas.connect(ssid, sae_password=passphrase, ieee80211w="2", key_mgmt="SAE", scan_freq="2412", wait_connect=False) ev = wpas.wait_event(['CTRL-EVENT-ASSOC-REJECT']) if not ev: raise Exception('Rejection not found') ev1 = wpas.wait_event(['Added BSSID']) ev2 = wpas.wait_event(['Added BSSID']) if (not ev1 or not ev2) or \ not ((hapd0.own_addr() in ev1 and hapd1.own_addr() in ev2) or (hapd1.own_addr() in ev1 and hapd0.own_addr() in ev2)): raise Exception('Not all BSSs were added to the ignore list') # After this message, a new scan clears the ignore and the STA # connects. wpas.wait_connected(timeout=15) def test_eht_connect_invalid_link(dev, apdev, params): """EHT MLD AP where one link is incorrectly configured and rejected by mac80211""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap" passphrase = 'qwertyuiop' ssid = "mld_ap" passphrase = 'qwertyuiop' link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2", key_mgmt="SAE", pwe='2') link_params['channel'] = '1' link_params['bssid'] = '00:11:22:33:44:01' hapd0 = eht_mld_enable_ap(hapd_iface, link_params) link_params['channel'] = '6' link_params['bssid'] = '00:11:22:33:44:02' hapd1 = eht_mld_enable_ap(hapd_iface, link_params) # We scan for both APs, then try to connect to link 0, but only the # second attempt will work if mac80211 rejects the second link. wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01") wpas.set("sae_pwe", "1") with fail_test(wpas, 1, "assoc;wpa_driver_nl80211_associate", 2, "link;wpa_driver_nl80211_associate"): wpas.connect(ssid, sae_password=passphrase, ieee80211w="2", key_mgmt="SAE", scan_freq="2412") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1) out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'), 'wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.data == 00:01:09:%s:00:00' % wpas.own_addr(), display=['frame.number']) if not out.splitlines(): raise Exception('Association request send by mac80211 had unexpected ML element content (probably it contained a second link)') def test_eht_mld_link_removal(dev, apdev): """EHT MLD with two links. Links removed during association""" with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas.interface_add(wpas_iface) ssid = "mld_ap_owe_two_link" params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2") hapd0 = eht_mld_enable_ap(hapd0_iface, params) params['channel'] = '6' hapd1 = eht_mld_enable_ap(hapd0_iface, params) wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE", ieee80211w="2") eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) eht_verify_wifi_version(wpas) traffic_test(wpas, hapd0) logger.info("Disable the 2nd link in 4 beacon intervals") hapd1.link_remove(4) time.sleep(0.6) logger.info("Test traffic after 2nd link disabled") traffic_test(wpas, hapd0) if "OK" not in hapd0.request("REKEY_GTK"): raise Exception("REKEY_GTK failed") ev = wpas.wait_event(["MLO RSN: Group rekeying completed"], timeout=2) if ev is None: raise Exception("GTK rekey timed out") traffic_test(wpas, hapd0) logger.info("Disable the 1st link in 20 beacon intervals") hapd0.link_remove(20) time.sleep(1) logger.info("Verify that traffic is valid before the link is removed") traffic_test(wpas, hapd0) time.sleep(2) logger.info("Test traffic after 1st link disabled") traffic_test(wpas, hapd0, success=False)