# EHT tests
# Copyright (c) 2022, Qualcomm Innovation Center, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.

import hostapd
from utils import *
from hwsim import HWSimRadio
import hwsim_utils
from wpasupplicant import WpaSupplicant
import re
from tshark import run_tshark

def eht_verify_wifi_version(dev):
    status = dev.get_status()
    logger.info("station status: " + str(status))

    if 'wifi_generation' not in status:
        raise Exception("Missing wifi_generation information")
    if status['wifi_generation'] != "7":
        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])

def _eht_get_links_bitmap(wpas, name):
    vfile = "/sys/kernel/debug/ieee80211/%s/netdev:%s/%s" % \
        (wpas.get_driver_status_field("phyname"), wpas.ifname, name)

    if wpas.cmd_execute(["ls", vfile])[0] != 0:
        logger_info("%s not supported in mac80211: %s" % (name, vfile))
        return 0

    res, out = wpas.cmd_execute(["cat", vfile], shell=True)
    if res != 0:
        raise Exception("Failed to read %s" % fname)

    logger.info("%s=%s" % (name, out))
    return int(out, 16)

def _eht_valid_links(wpas):
    return _eht_get_links_bitmap(wpas, "valid_links")

def _eht_active_links(wpas):
    return _eht_get_links_bitmap(wpas, "active_links")

def _eht_dormant_links(wpas):
    return _eht_get_links_bitmap(wpas, "dormant_links")

def _eht_verify_links(wpas, valid_links=0, active_links=0):
    vlinks = _eht_valid_links(wpas)
    if vlinks != valid_links:
        raise Exception("Unexpected valid links (0x%04x != 0x%04x)" % (vlinks, valid_links))

    alinks = _eht_active_links(wpas)
    if alinks != active_links:
        raise Exception("Unexpected active links (0x%04x != 0x%04x)" % (alinks, active_links))

def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False,
                      mld=False, valid_links=0, active_links=0):
    status = hapd.get_status()

    logger.info("hostapd STATUS: " + str(status))
    if is_ht and status["ieee80211n"] != "1":
        raise Exception("Unexpected STATUS ieee80211n value")
    if is_vht and status["ieee80211ac"] != "1":
        raise Exception("Unexpected STATUS ieee80211ac value")
    if status["ieee80211ax"] != "1":
        raise Exception("Unexpected STATUS ieee80211ax value")
    if status["ieee80211be"] != "1":
        raise Exception("Unexpected STATUS ieee80211be value")

    sta = hapd.get_sta(wpas.own_addr())
    logger.info("hostapd STA: " + str(sta))
    if is_ht and "[HT]" not in sta['flags']:
        raise Exception("Missing STA flag: HT")
    if is_vht and "[VHT]" not in sta['flags']:
        raise Exception("Missing STA flag: VHT")
    if "[HE]" not in sta['flags']:
        raise Exception("Missing STA flag: HE")
    if "[EHT]" not in sta['flags']:
        raise Exception("Missing STA flag: EHT")

    sig = wpas.request("SIGNAL_POLL").splitlines()

    # TODO: With MLD connection, signal poll logic is still not implemented.
    # While mac80211 maintains the station using the MLD address, the
    # information is maintained in the link stations, but it is not sent to
    # user space yet.
    if not mld:
        if "FREQUENCY=%s" % freq not in sig:
            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
        if "WIDTH=%s MHz" % bw not in sig:
            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))

    # Active links are updated in async work after the connection.
    # Sleep a bit to allow it to run.
    time.sleep(0.1)
    _eht_verify_links(wpas, valid_links, active_links)

def traffic_test(wpas, hapd, success=True):
    hwsim_utils.test_connectivity(wpas, hapd, success_expected=success)

def test_eht_open(dev, apdev):
    """EHT AP with open mode configuration"""
    params = {"ssid": "eht",
              "ieee80211ax": "1",
              "ieee80211be": "1"}
    try:
        hapd = hostapd.add_ap(apdev[0], params)
    except Exception as e:
        if isinstance(e, Exception) and \
           str(e) == "Failed to set hostapd parameter ieee80211be":
            raise HwsimSkip("EHT not supported")
        raise
    if hapd.get_status_field("ieee80211be") != "1":
        raise Exception("AP STATUS did not indicate ieee80211be=1")
    dev[0].connect("eht", key_mgmt="NONE", scan_freq="2412")
    sta = hapd.get_sta(dev[0].own_addr())
    if "[EHT]" not in sta['flags']:
        raise Exception("Missing STA flag: EHT")
    status = dev[0].request("STATUS")
    if "wifi_generation=7" not in status:
        raise Exception("STA STATUS did not indicate wifi_generation=7")

def test_prefer_eht_20(dev, apdev):
    params = {"ssid": "eht",
              "channel": "1",
              "ieee80211ax": "1",
              "ieee80211be" : "1",
              "ieee80211n": "1"}
    try:
        hapd0 = hostapd.add_ap(apdev[0], params)

        params["ieee80211be"] = "0"
        hapd1 = hostapd.add_ap(apdev[1], params)
    except Exception as e:
        if isinstance(e, Exception) and \
           str(e) == "Failed to set hostapd parameter ieee80211be":
            raise HwsimSkip("EHT not supported")
        raise

    dev[0].connect("eht", key_mgmt="NONE")
    if dev[0].get_status_field('bssid') != apdev[0]['bssid']:
        raise Exception("dev[0] connected to unexpected AP")

    est = dev[0].get_bss(apdev[0]['bssid'])['est_throughput']
    if est != "172103":
      raise Exception("Unexpected BSS1 est_throughput: " + est)

def start_eht_sae_ap(apdev, ml=False, transition_mode=False):
    params = hostapd.wpa2_params(ssid="eht", passphrase="12345678")
    params["ieee80211ax"] = "1"
    params["ieee80211be"] = "1"
    params['ieee80211w'] = '1' if transition_mode else '2'
    params['rsn_pairwise'] = "CCMP GCMP-256" if transition_mode else "GCMP-256"
    params['group_cipher'] = "CCMP" if transition_mode else "GCMP-256"
    params["group_mgmt_cipher"] = "AES-128-CMAC" if transition_mode else "BIP-GMAC-256"
    params['beacon_prot'] = '1'
    params['wpa_key_mgmt'] = "SAE SAE-EXT-KEY" if transition_mode else 'SAE-EXT-KEY'
    params['sae_groups'] = "19 20" if transition_mode else "20"
    params['sae_pwe'] = "2" if transition_mode else "1"
    if ml:
        ml_elem = "ff0d6b" + "3001" + "0a" + "021122334455" + "01" + "00" + "00"
        params['vendor_elements'] = ml_elem
    try:
        hapd = hostapd.add_ap(apdev, params)
    except Exception as e:
        if isinstance(e, Exception) and \
           str(e) == "Failed to set hostapd parameter ieee80211be":
            raise HwsimSkip("EHT not supported")
        raise

def test_eht_sae(dev, apdev):
    """EHT AP with SAE"""
    check_sae_capab(dev[0])

    hapd = start_eht_sae_ap(apdev[0])
    try:
        dev[0].set("sae_groups", "20")
        dev[0].set("sae_pwe", "2")
        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
                       ieee80211w="2", beacon_prot="1",
                       pairwise="GCMP-256", group="GCMP-256",
                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
    finally:
        dev[0].set("sae_groups", "")
        dev[0].set("sae_pwe", "0")

def test_eht_sae_mlo(dev, apdev):
    """EHT+MLO AP with SAE"""
    check_sae_capab(dev[0])

    hapd = start_eht_sae_ap(apdev[0], ml=True)
    try:
        dev[0].set("sae_groups", "20")
        dev[0].set("sae_pwe", "2")
        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
                       ieee80211w="2", beacon_prot="1",
                       pairwise="GCMP-256", group="GCMP-256",
                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
    finally:
        dev[0].set("sae_groups", "")
        dev[0].set("sae_pwe", "0")

def test_eht_sae_mlo_tm(dev, apdev):
    """EHT+MLO AP with SAE and transition mode"""
    check_sae_capab(dev[0])
    check_sae_capab(dev[1])

    hapd = start_eht_sae_ap(apdev[0], ml=True, transition_mode=True)
    try:
        dev[0].set("sae_groups", "20")
        dev[0].set("sae_pwe", "2")
        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
                       ieee80211w="2", beacon_prot="1",
                       pairwise="GCMP-256", group="CCMP",
                       group_mgmt="AES-128-CMAC", scan_freq="2412")
        dev[1].set("sae_groups", "19")
        dev[1].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
                       ieee80211w="2", beacon_prot="1",
                       pairwise="CCMP", group="CCMP",
                       group_mgmt="AES-128-CMAC", scan_freq="2412",
                       disable_eht="1")
    finally:
        dev[0].set("sae_groups", "")
        dev[0].set("sae_pwe", "0")
        dev[1].set("sae_groups", "")

def eht_mld_enable_ap(iface, params):
    hapd = hostapd.add_mld_link(iface, params)
    hapd.enable()

    ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1)
    if ev is None:
        raise Exception("AP startup timed out")
    if "AP-ENABLED" not in ev:
        raise Exception("AP startup failed")

    return hapd

def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256",
                           mfp="2", pwe=None, beacon_prot="1"):
    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
                                 wpa_key_mgmt=key_mgmt, ieee80211w=mfp)
    params['ieee80211n'] = '1'
    params['ieee80211ax'] = '1'
    params['ieee80211be'] = '1'
    params['channel'] = '1'
    params['hw_mode'] = 'g'
    params['group_mgmt_cipher'] = "AES-128-CMAC"
    params['beacon_prot'] = beacon_prot

    if pwe is not None:
        params['sae_pwe'] = pwe

    return params

def _eht_mld_probe_req(wpas, hapd, tsf0, link_id=-1):
    if "OK" not in wpas.request("ML_PROBE_REQ bssid=%s mld_id=0 link_id=%d" % (hapd.own_addr(), link_id)):
        raise Exception("Failed to request ML probe request")

    ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
    if ev is None:
        raise Exception("Scan did not start")

    ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
    if ev is None:
        raise Exception("Scan did not complete")

    logger.info("ML Probe request scan done")

    bss = wpas.get_bss(hapd.own_addr())
    if not bss:
        raise Exception("AP did not reply to ML probe request")

    tsf1 = int(bss['tsf'])
    logger.info("tsf0=%s, tsf1=%s" % (tsf0, tsf1))

    if tsf0 >= tsf1:
        raise Exception("AP was not found in ML probe request scan")

def test_eht_mld_discovery(dev, apdev):
    """EHT MLD AP discovery"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap"
        link0_params = {"ssid": ssid,
                        "hw_mode": "g",
                        "channel": "1"}
        link1_params = {"ssid": ssid,
                        "hw_mode": "g",
                        "channel": "2"}

        hapd0 = eht_mld_enable_ap(hapd_iface, link0_params)
        hapd1 = eht_mld_enable_ap(hapd_iface, link1_params)

        # Only scan link 0
        res = wpas.request("SCAN freq=2412")
        if "FAIL" in res:
            raise Exception("Failed to start scan")

        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
        if ev is None:
            raise Exception("Scan did not start")

        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
        if ev is None:
            raise Exception("Scan did not complete")

        logger.info("Scan done")

        rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
                                 re.MULTILINE)
        ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)

        bss = wpas.request("BSS " + hapd0.own_addr())
        logger.info("BSS 0: " + str(bss))

        if rnr_pattern.search(bss) is None:
            raise Exception("RNR element not found for first link")

        if ml_pattern.search(bss) is None:
            raise Exception("ML element not found for first link")

        # Save the tsf0 for checking ML Probe request scan later
        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])

        if wpas.get_bss(hapd1.own_addr()) is not None:
            raise Exception("BSS for link 1 found without ML probe request")

        # Now send an ML probe request (for all links)
        _eht_mld_probe_req(wpas, hapd0, tsf0)
        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])

        # NOTE: hostapd incorrectly reports a TSF offset of zero
        # This only works because the source is always the ML probe response
        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])

        bss = wpas.request("BSS " + hapd1.own_addr())
        logger.info("BSS 1: " + str(bss))

        if rnr_pattern.search(bss) is None:
            raise Exception("RNR element not found for second link")

        if ml_pattern.search(bss) is None:
            raise Exception("ML element not found for second link")

        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=1)
        if int(wpas.get_bss(hapd1.own_addr())['tsf']) <= tsf1:
            raise Exception("Probe for link ID did not update BSS")
        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])

        # Probing the wrong link ID should not update second link
        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=4)
        if int(wpas.get_bss(hapd1.own_addr())['tsf']) != tsf1:
            raise Exception("Probe for other link ID not updated BSS")

def test_eht_mld_owe_two_links(dev, apdev):
    """EHT MLD AP with MLD client OWE connection using two links"""
    _eht_mld_owe_two_links(dev, apdev)

def _eht_mld_owe_two_links(dev, apdev, second_link_disabled=False):
    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap_owe_two_link"
        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")

        hapd0 = eht_mld_enable_ap(hapd0_iface, params)

        params['channel'] = '6'
        if second_link_disabled:
            params['mld_indicate_disabled'] = '1'

        hapd1 = eht_mld_enable_ap(hapd0_iface, params)
        # Check legacy client connection
        dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2")
        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
                     ieee80211w="2")

        active_links = 3
        if second_link_disabled:
            dlinks = _eht_dormant_links(wpas)
            if dlinks != 2:
                raise Exception("Unexpected dormant links")
            active_links = 1

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=active_links)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)

        if not second_link_disabled:
            traffic_test(wpas, hapd1)

def test_eht_mld_owe_two_links_one_disabled(dev, apdev):
    """AP MLD with MLD client OWE connection when one of the AP MLD links is disabled"""
    _eht_mld_owe_two_links(dev, apdev, second_link_disabled=True)

def test_eht_mld_sae_single_link(dev, apdev):
    """EHT MLD AP with MLD client SAE H2E connection using single link"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_single_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE",
                                        mfp="2", pwe='2')

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        wpas.set("sae_pwe", "1")
        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
                     key_mgmt="SAE", ieee80211w="2")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=1, active_links=1)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)

def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1"):
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE", mfp="2", pwe='1',
                                        beacon_prot=beacon_prot)

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        wpas.set("sae_pwe", "1")

        # The first authentication attempt tries to use group 20 and the
        # authentication is expected to fail. The next authentication should
        # use group 19 and succeed.
        wpas.set("sae_groups", "20 19")

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
                     key_mgmt="SAE", ieee80211w="2", beacon_prot="1")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        eht_verify_wifi_version(wpas)

        if wpas.get_status_field('sae_group') != '19':
            raise Exception("Expected SAE group not used")

        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

def test_eht_mld_sae_two_links(dev, apdev):
    """EHT MLD AP with MLD client SAE H2E connection using two links"""
    run_eht_mld_sae_two_links(dev, apdev)

def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev):
    """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection"""
    run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0")

def test_eht_mld_sae_ext_one_link(dev, apdev):
    """EHT MLD AP with MLD client SAE-EXT H2E connection using single link"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_ext_single_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=1, active_links=1)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)

def test_eht_mld_sae_ext_two_links(dev, apdev):
    """EHT MLD AP with MLD client SAE-EXT H2E connection using two links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE-EXT-KEY")

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

def test_eht_mld_sae_legacy_client(dev, apdev):
    """EHT MLD AP with legacy client SAE H2E connection"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE", mfp="2", pwe='1')

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        try:
            dev[0].set("sae_groups", "")
            dev[0].set("sae_pwe", "1")
            dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
                           key_mgmt="SAE", ieee80211w="2", beacon_prot="1")

            eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True)
            traffic_test(dev[0], hapd0)
        finally:
            dev[0].set("sae_groups", "")
            dev[0].set("sae_pwe", "0")

def test_eht_mld_sae_transition(dev, apdev):
    """EHT MLD AP in SAE/PSK transition mode with MLD client connection using two links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
                                        mfp="1")

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

        dev[0].set("sae_groups", "")
        dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
                       key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
        dev[1].connect(ssid, psk=passphrase, scan_freq="2412",
                       key_mgmt="WPA-PSK", ieee80211w="0")

def test_eht_mld_ptk_rekey(dev, apdev):
    """EHT MLD AP and PTK rekeying with MLD client connection using two links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
                                        mfp="1")
        params['wpa_ptk_rekey'] = '5'

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
        if ev0 is None:
            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

        ev = wpas.wait_event(["WPA: Key negotiation completed",
                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
        if ev is None:
            raise Exception("PTK rekey timed out")
        if "CTRL-EVENT-DISCONNECTED" in ev:
            raise Exception("Disconnect instead of rekey")

        time.sleep(0.1)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

def test_eht_mld_gtk_rekey(dev, apdev):
    """AP MLD and GTK rekeying with MLD client connection using two links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
                                        mfp="1")
        params['wpa_group_rekey'] = '5'

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
        if ev0 is None:
            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

        for i in range(2):
            ev = wpas.wait_event(["MLO RSN: Group rekeying completed",
                                  "CTRL-EVENT-DISCONNECTED"], timeout=10)
            if ev is None:
                raise Exception("GTK rekey timed out")
            if "CTRL-EVENT-DISCONNECTED" in ev:
                raise Exception("Disconnect instead of rekey")

            #TODO: Uncomment these ones GTK rekeying works for MLO
            #time.sleep(0.1)
            #traffic_test(wpas, hapd0)
            #traffic_test(wpas, hapd1)

def test_eht_ml_probe_req(dev, apdev):
    """AP MLD with two links and non-AP MLD sending ML Probe Request"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        passphrase = 'qwertyuiop'
        ssid = "mld_ap_sae_two_link"
        params = eht_mld_ap_wpa2_params(ssid, passphrase,
                                        key_mgmt="SAE-EXT-KEY")

        hapd0 = eht_mld_enable_ap(hapd_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd_iface, params)

        bssid = hapd0.own_addr()
        wpas.scan_for_bss(bssid, freq=2412)

        time.sleep(1)
        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0"
        if "OK" not in wpas.request(cmd):
            raise Exception("Failed to run: " + cmd)
        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
        if ev is None:
            raise Exception("ML_PROBE_REQ did not result in scan results")

        time.sleep(1)
        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0 link_id=2"
        if "OK" not in wpas.request(cmd):
            raise Exception("Failed to run: " + cmd)
        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
        if ev is None:
            raise Exception("ML_PROBE_REQ did not result in scan results")

def test_eht_mld_connect_probes(dev, apdev, params):
    """MLD client sends ML probe to connect to not discovered links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap"
        passphrase = 'qwertyuiop'
        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
                                             key_mgmt="SAE", pwe='2')
        link_params['channel'] = '1'
        link_params['bssid'] = '00:11:22:33:44:01'
        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)

        link_params['channel'] = '6'
        link_params['bssid'] = '00:11:22:33:44:02'
        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)

        wpas.set("sae_pwe", "1")
        wpas.connect(ssid, sae_password= passphrase, ieee80211w="2",
                     key_mgmt="SAE", scan_freq="2412")

        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
                         'wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.number == 107 && wlan.ext_tag.data == 11:00:02:00:00:02:11:00',
                         display=['frame.number'])
        if not out.splitlines():
            raise Exception('ML probe request not found')

        # Probe Response frame has the ML element, which will be fragmented
        out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
                         "wlan.fc.type_subtype == 0x0005 && wlan.ext_tag.number == 107 && wlan.ext_tag.length == 254",
                         display=['frame.number'])
        if not out.splitlines():
            # This requires new tshark (e.g., 4.0.6); for now, ignore the issue
            # to avoid forcing such upgrade.
            logger.info('ML probe response not found')
            #raise Exception('ML probe response not found')

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

def test_eht_tx_link_rejected_connect_other(dev, apdev, params):
    """EHT MLD AP with MLD client being rejected on TX link, but then connecting on second link"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap"
        passphrase = 'qwertyuiop'
        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
                                             key_mgmt="SAE", pwe='2')
        link_params['channel'] = '1'
        link_params['bssid'] = '00:11:22:33:44:01'
        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)

        link_params['channel'] = '6'
        link_params['bssid'] = '00:11:22:33:44:02'
        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)

        wpas.set("sae_pwe", "1")
        with fail_test(hapd0, 1, "hostapd_get_aid"):
            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
                         key_mgmt="SAE", scan_freq="2412")

        eht_verify_status(wpas, hapd1, 2437, 20, is_ht=True, mld=True,
                          valid_links=2, active_links=2)
        traffic_test(wpas, hapd0)
        traffic_test(wpas, hapd1)

def test_eht_all_links_rejected(dev, apdev, params):
    """EHT MLD AP with MLD client ignores all rejected links"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap"
        passphrase = 'qwertyuiop'
        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
                                             key_mgmt="SAE", pwe='2')
        link_params['channel'] = '1'
        link_params['bssid'] = '00:11:22:33:44:01'
        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)

        link_params['channel'] = '6'
        link_params['bssid'] = '00:11:22:33:44:02'
        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)
        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
        wpas.set("sae_pwe", "1")

        with fail_test(hapd0, 1, "hostapd_get_aid",
                       1, "hostapd_process_assoc_ml_info"):
            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
                         key_mgmt="SAE", scan_freq="2412", wait_connect=False)
            ev = wpas.wait_event(['CTRL-EVENT-ASSOC-REJECT'])
            if not ev:
                raise Exception('Rejection not found')

            ev1 = wpas.wait_event(['Added BSSID'])
            ev2 = wpas.wait_event(['Added BSSID'])
            if (not ev1 or not ev2) or \
                not ((hapd0.own_addr() in ev1 and hapd1.own_addr() in ev2) or
                     (hapd1.own_addr() in ev1 and hapd0.own_addr() in ev2)):
                raise Exception('Not all BSSs were added to the ignore list')

            # After this message, a new scan clears the ignore and the STA
            # connects.
            wpas.wait_connected(timeout=15)

def test_eht_connect_invalid_link(dev, apdev, params):
    """EHT MLD AP where one link is incorrectly configured and rejected by mac80211"""
    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap"
        passphrase = 'qwertyuiop'
        ssid = "mld_ap"
        passphrase = 'qwertyuiop'
        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
                                             key_mgmt="SAE", pwe='2')
        link_params['channel'] = '1'
        link_params['bssid'] = '00:11:22:33:44:01'
        hapd0 = eht_mld_enable_ap(hapd_iface, link_params)

        link_params['channel'] = '6'
        link_params['bssid'] = '00:11:22:33:44:02'
        hapd1 = eht_mld_enable_ap(hapd_iface, link_params)

        # We scan for both APs, then try to connect to link 0, but only the
        # second attempt will work if mac80211 rejects the second link.
        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
        wpas.set("sae_pwe", "1")
        with fail_test(wpas, 1, "assoc;wpa_driver_nl80211_associate",
                             2, "link;wpa_driver_nl80211_associate"):
            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
                         key_mgmt="SAE", scan_freq="2412")

        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=1, active_links=1)

        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
                         'wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.data == 00:01:09:%s:00:00' % wpas.own_addr(),
                         display=['frame.number'])
        if not out.splitlines():
            raise Exception('Association request send by mac80211 had unexpected ML element content (probably it contained a second link)')

def test_eht_mld_link_removal(dev, apdev):
    """EHT MLD with two links. Links removed during association"""

    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap_owe_two_link"
        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
        hapd0 = eht_mld_enable_ap(hapd0_iface, params)

        params['channel'] = '6'
        hapd1 = eht_mld_enable_ap(hapd0_iface, params)

        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
                     ieee80211w="2")
        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        eht_verify_wifi_version(wpas)
        traffic_test(wpas, hapd0)

        logger.info("Disable the 2nd link in 4 beacon intervals")
        hapd1.link_remove(4)
        time.sleep(0.6)

        logger.info("Test traffic after 2nd link disabled")
        traffic_test(wpas, hapd0)

        if "OK" not in hapd0.request("REKEY_GTK"):
            raise Exception("REKEY_GTK failed")

        ev = wpas.wait_event(["MLO RSN: Group rekeying completed"], timeout=2)
        if ev is None:
            raise Exception("GTK rekey timed out")

        traffic_test(wpas, hapd0)

        logger.info("Disable the 1st link in 20 beacon intervals")
        hapd0.link_remove(20)
        time.sleep(1)

        logger.info("Verify that traffic is valid before the link is removed")
        traffic_test(wpas, hapd0)
        time.sleep(2)

        logger.info("Test traffic after 1st link disabled")
        traffic_test(wpas, hapd0, success=False)

def test_eht_mld_bss_trans_mgmt_link_removal_imminent(dev, apdev):
    """EHT MLD with two links. BSS transition management with link removal imminent"""

    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):

        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
        wpas.interface_add(wpas_iface)

        ssid = "mld_ap_owe_two_link"
        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
        params["bss_transition"] = "1"
        params["mbo"] = "1"

        hapd0 = eht_mld_enable_ap(hapd0_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd0_iface, params)

        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
                     ieee80211w="2")
        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
                          valid_links=3, active_links=3)
        eht_verify_wifi_version(wpas)
        hapd0.wait_sta()
        hapd1.wait_sta()
        traffic_test(wpas, hapd0)

        addr = wpas.own_addr()
        cmd = "BSS_TM_REQ " + addr + " disassoc_timer=3 disassoc_imminent=1 link_removal_imminent=1 bss_term=0,1"
        if "OK" not in hapd0.request(cmd):
            raise Exception("BSS_TM_REQ command failed")

        # Only one link is terminate, so the STA is expected to remain
        # associated and not start a scan.
        ev = hapd0.wait_event(['BSS-TM-RESP'], timeout=5)
        # For now, allow this to pass without the BSS TM response since that
        # functionality with MLD needs a recent kernel change.
        #if ev is None:
        #    raise Exception("No BSS TM response received")
        if ev and "status_code=0" not in ev:
            raise Exception("Unexpected BSS TM response contents: " + ev)

        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED",
                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
        if ev is not None:
            raise Exception("Unexpected action on STA: " + ev)

def send_check(hapd, frame, no_tx_status=False):
        cmd = "MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame="
        hapd.request(cmd + frame)
        if no_tx_status:
            return
        ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=1)
        if ev is None:
            raise Exception("No TX status")

def test_eht_ap_mld_proto(dev, apdev):
    """AP MLD protocol testing"""
    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface):

        ssid = "mld_ap_owe_two_link"
        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")

        hapd0 = eht_mld_enable_ap(hapd0_iface, params)

        params['channel'] = '6'

        hapd1 = eht_mld_enable_ap(hapd0_iface, params)

        ap_mld_addr = hapd0.get_status_field("mld_addr[0]").replace(':', '')
        bssid0 = hapd0.own_addr().replace(':', '')
        bssid1 = hapd1.own_addr().replace(':', '')

        time.sleep(1)
        hapd0.set("ext_mgmt_frame_handling", "1")
        hapd1.set("ext_mgmt_frame_handling", "1")

        # Truncated EML missing MLD Capabilities And operations field
        hapd0.note("Truncated EML missing MLD Capabilities And operations field")
        addr0 = "021122334400"
        addr1 = "021122334401"
        mld_addr = "02112233440f"
        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
        mle = "ff0a6b000007" + mld_addr
        auth = hdr + "0000" + "0100" + "0000" + mle
        send_check(hapd0, auth)

        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
        ssid = "00136d6c645f61705f6f77655f74776f5f6c696e6b"
        supp_rates = "010802040b160c121824"
        ext_supp_rates = "32043048606c"
        rsne = "301a0100000fac040100000fac040100000fac12cc000000000fac06"
        ht_capab = "2d1afe131bffff000000000000000000000100000000000000000000"
        ext_capab = "7f0a04004a02014000400001"
        he_capab = "ff16230178c81a400000bfce0000000000000000fafffaff"
        eht_capab = "ff126c07007c0000feffff7f0100888888880000"
        supp_op_classes = "3b155151525354737475767778797a7b7c7d7e7f808182"
        dh_param = "ff23201300ea85e693343a079500cf4d461011a0ff90ec4de1af40165adbea94a3f36eb071"
        wmm = "dd070050f202000100"
        assocreq_start = "3004" + "0500" + ssid + supp_rates + ext_supp_rates + rsne + ht_capab + ext_capab + he_capab
        assocreq_end = eht_capab + supp_op_classes + dh_param + wmm

        # --> Not enough bytes for common info
        mle = "ff0a6b000109" + mld_addr
        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)

        # Truncated Non-Inheritance element
        hapd0.note("Truncated Non-Inheritance element")
        addr0 = "021122334410"
        addr1 = "021122334411"
        mld_addr = "02112233441f"
        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
        mle = "ff0a6b000007" + mld_addr
        auth = hdr + "0000" + "0100" + "0000" + mle
        send_check(hapd0, auth)

        # --> MLD: Invalid inheritance
        mle = "ff7d6b000109" + mld_addr + "0000"
        mle += "0067" + "3100" + "07" + addr1
        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
        # Non-Inhericance element
        mle += "ff023800"
        # Unknown optional subelement
        mle += "aa00"
        # Vendor-Specific subelement
        mle += "dd0411223344"
        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end,
                   no_tx_status=True)

        # Empty Non-Inheritance element
        hapd0.note("Empty Non-Inheritance element")
        addr0 = "021122334420"
        addr1 = "021122334421"
        mld_addr = "02112233442f"
        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
        mle = "ff0a6b000007" + mld_addr
        auth = hdr + "0000" + "0100" + "0000" + mle
        send_check(hapd0, auth)

        mle = "ff7e6b000109" + mld_addr + "0000"
        mle += "0068" + "3100" + "07" + addr1
        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
        # Non-Inhericance element
        mle += "ff03380000"
        # Unknown optional subelement
        mle += "aa00"
        # Vendor-Specific subelement
        mle += "dd0411223344"
        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)

        # Non-Inheritance element
        hapd0.note("Non-Inheritance element")
        addr0 = "021122334430"
        addr1 = "021122334431"
        mld_addr = "02112233443f"
        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
        mle = "ff0a6b000007" + mld_addr
        auth = hdr + "0000" + "0100" + "0000" + mle
        send_check(hapd0, auth)

        mle = "ff9e6b000109" + mld_addr + "0000"
        mle += "0088" + "3100" + "07" + addr1
        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
        # Non-Inhericance element
        mle += "ff2338" + "1010032a362137387172756b548bedeff0" + "106b01020304050607080c0d21643b3a36"
        # Unknown optional subelement
        mle += "aa00"
        # Vendor-Specific subelement
        mle += "dd0411223344"
        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)

        # No Non-Inheritance element
        hapd0.note("No Non-Inheritance element")
        addr0 = "021122334440"
        addr1 = "021122334441"
        mld_addr = "02112233444f"
        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
        mle = "ff0a6b000007" + mld_addr
        auth = hdr + "0000" + "0100" + "0000" + mle
        send_check(hapd0, auth)

        mle = "ff716b000109" + mld_addr + "0000"
        mle += "0063" + "3100" + "07" + addr1
        mle += "3004010802040b160c12182432043048606c2d1afe131bffff000000000000000000000100000000000000000000ff16230178c81a400000bfce0000000000000000fafffaffff126c07007c0000feffff7f0100888888880000"
        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)

def _5ghz_chanwidth_to_bw(op):
    return {
        0: "40",
        1: "80",
        2: "160",
        3: "80+80",
    }.get(op, "20")

def _test_eht_5ghz(dev, apdev, channel, chanwidth, ccfs1, ccfs2=0,
                   eht_oper_puncturing_override=None):
    try:
        params = {"ssid": "eht",
                  "country_code": "US",
                  "hw_mode": "a",
                  "channel": str(channel),
                  "ieee80211n": "1",
                  "ieee80211ac": "1",
                  "ieee80211ax": "1",
                  "ieee80211be": "1",
                  "vht_oper_chwidth": str(chanwidth),
                  "vht_oper_centr_freq_seg0_idx": str(ccfs1),
                  "vht_oper_centr_freq_seg1_idx": str(ccfs2),
                  "he_oper_chwidth": str(chanwidth),
                  "he_oper_centr_freq_seg1_idx": str(ccfs2),
                  "he_oper_centr_freq_seg0_idx": str(ccfs1),
                  "eht_oper_centr_freq_seg0_idx": str(ccfs1),
                  "eht_oper_chwidth": str(chanwidth)}

        if chanwidth == 0:
            if channel == ccfs1:
                  bw = "20"
            elif channel < ccfs1:
                  params["ht_capab"] = "[HT40+]"
            else:
                  params["ht_capab"] = "[HT40-]"
        else:
                  params["ht_capab"] = "[HT40+]"
                  if chanwidth == 2:
                      params["vht_capab"] = "[VHT160]"
                  elif chanwidth == 3:
                      params["vht_capab"] = "[VHT160-80PLUS80]"

        if eht_oper_puncturing_override:
            params['eht_oper_puncturing_override'] = eht_oper_puncturing_override

        freq = 5000 + channel * 5
        if chanwidth != 0 or channel != ccfs1:
            bw = _5ghz_chanwidth_to_bw(chanwidth)

        hapd = hostapd.add_ap(apdev[0], params)
        dev[0].connect("eht", key_mgmt="NONE", scan_freq=str(freq))
        hapd.wait_sta()

        eht_verify_status(dev[0], hapd, freq, bw, is_ht=True, is_vht=True)
        eht_verify_wifi_version(dev[0])
        hwsim_utils.test_connectivity(dev[0], hapd)

        if eht_oper_puncturing_override:
            hapd.set("eht_oper_puncturing_override", "0x0")
            hapd.request("UPDATE_BEACON")
            time.sleep(1)
    finally:
        dev[0].request("DISCONNECT")
        dev[0].wait_disconnected()
        hapd.wait_sta_disconnect()
        set_world_reg(apdev[0], None, dev[0])

def test_eht_5ghz_20mhz(dev, apdev):
    """EHT with 20 MHz channel width on 5 GHz"""
    _test_eht_5ghz(dev, apdev, 36, 0, 36, 0)

def test_eht_5ghz_40mhz_low(dev, apdev):
    """EHT with 40 MHz channel width on 5 GHz - secondary channel above"""
    _test_eht_5ghz(dev, apdev, 36, 0, 38, 0)

def test_eht_5ghz_40mhz_high(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - secondary channel below"""
    _test_eht_5ghz(dev, apdev, 40, 0, 38, 0)

def test_eht_5ghz_80mhz_1(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0)

def test_eht_5ghz_80mhz_2(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0)

def test_eht_5ghz_80mhz_puncturing_override_1(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - primary=36 - puncturing override (2nd)"""

    # The 2nd 20 MHz is punctured
    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0,
                   eht_oper_puncturing_override="0x0002")

def test_eht_5ghz_80mhz_puncturing_override_2(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (3rd)"""

    # The 3rd 20 MHz is punctured
    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
                   eht_oper_puncturing_override="0x0004")

def test_eht_5ghz_80mhz_puncturing_override_3(dev, apdev):
    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (4th)"""

    # The 4th 20 MHz is punctured
    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
                   eht_oper_puncturing_override="0x0008")

def test_eht_5ghz_80p80mhz(dev, apdev):
    """EHT with 80+80 MHz channel width on 5 GHz"""
    _test_eht_5ghz(dev, apdev, 36, 3, 42, 155)

def _6ghz_op_class_to_bw(op):
    return {
        131: "20",
        132: "40",
        133: "80",
        134: "160",
        137: "320",
    }.get(op, "20")

def _test_eht_6ghz(dev, apdev, channel, op_class, ccfs1):
    check_sae_capab(dev[0])

    # CA enables 320 MHz channels without NO-IR restriction
    dev[0].cmd_execute(['iw', 'reg', 'set', 'CA'])
    wait_regdom_changes(dev[0])

    try:
        ssid = "eht_6ghz_sae"
        passphrase = "12345678"
        params = hostapd.he_wpa2_params(ssid=ssid, passphrase=passphrase)
        params["ieee80211be"] = "1"
        params["channel"] = str(channel)
        params["op_class"] = str(op_class)
        params["he_oper_centr_freq_seg0_idx"] = str(ccfs1)
        params["eht_oper_centr_freq_seg0_idx"] = str(ccfs1)
        params["country_code"] = "CA"

        if not he_6ghz_supported():
            raise HwsimSkip("6 GHz frequency is not supported")
        if op_class == 137 and not eht_320mhz_supported():
            raise HwsimSkip("320 MHz channels are not supported")

        hapd = hostapd.add_ap(apdev[0], params)
        status = hapd.get_status()
        logger.info("hostapd STATUS: " + str(status))
        if hapd.get_status_field("ieee80211ax") != "1":
            raise Exception("STATUS did not indicate ieee80211ax=1")

        if hapd.get_status_field("ieee80211be") != "1":
            raise Exception("STATUS did not indicate ieee80211be=1")

        dev[0].set("sae_pwe", "1")

        freq = 5950 + channel * 5
        bw = _6ghz_op_class_to_bw(op_class)

        dev[0].connect(ssid, key_mgmt="SAE", psk=passphrase, ieee80211w="2",
                       scan_freq=str(freq))
        hapd.wait_sta()

        eht_verify_status(dev[0], hapd, freq, bw)
        eht_verify_wifi_version(dev[0])
        hwsim_utils.test_connectivity(dev[0], hapd)
        dev[0].request("DISCONNECT")
        dev[0].wait_disconnected()
        hapd.wait_sta_disconnect()
        hapd.disable()
    finally:
        dev[0].set("sae_pwe", "0")
        dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
        wait_regdom_changes(dev[0])

def test_eht_6ghz_20mhz(dev, apdev):
    """EHT with 20 MHz channel width on 6 GHz"""
    _test_eht_6ghz(dev, apdev, 5, 131, 5)

def test_eht_6ghz_40mhz(dev, apdev):
    """EHT with 40 MHz channel width on 6 GHz"""
    _test_eht_6ghz(dev, apdev, 5, 132, 3)

def test_eht_6ghz_80mhz(dev, apdev):
    """EHT with 80 MHz channel width on 6 GHz"""
    _test_eht_6ghz(dev, apdev, 5, 133, 7)

def test_eht_6ghz_160mhz(dev, apdev):
    """EHT with 160 MHz channel width on 6 GHz"""
    _test_eht_6ghz(dev, apdev, 5, 134, 15)

def test_eht_6ghz_320mhz(dev, apdev):
    """EHT with 320 MHz channel width on 6 GHz"""
    _test_eht_6ghz(dev, apdev, 5, 137, 31)

def test_eht_6ghz_320mhz_2(dev, apdev):
    """EHT with 320 MHz channel width on 6 GHz center 63"""
    _test_eht_6ghz(dev, apdev, 37, 137, 63)

def test_eht_6ghz_320mhz_3(dev, apdev):
    """EHT with 320 MHz channel width on 6 GHz center 31 primary 37"""
    _test_eht_6ghz(dev, apdev, 37, 137, 31)