tests: Add basic MLD hwsim tests

Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski@intel.com>
This commit is contained in:
Andrei Otcheretianski 2023-05-22 22:34:12 +03:00 committed by Jouni Malinen
parent 5199cff4c7
commit f5592e2d5e
4 changed files with 379 additions and 7 deletions

View file

@ -117,3 +117,4 @@ CONFIG_DPP2=y
CONFIG_WEP=y CONFIG_WEP=y
CONFIG_PASN=y CONFIG_PASN=y
CONFIG_AIRTIME_POLICY=y CONFIG_AIRTIME_POLICY=y
CONFIG_IEEE80211BE=y

View file

@ -101,6 +101,11 @@ class HostapdGlobal:
if not ignore_error: if not ignore_error:
raise Exception("Could not add hostapd BSS") raise Exception("Could not add hostapd BSS")
def add_link(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
if "OK" not in res:
raise Exception("Could not add hostapd link")
def remove(self, ifname): def remove(self, ifname):
self.request("REMOVE " + ifname, timeout=30) self.request("REMOVE " + ifname, timeout=30)
@ -141,13 +146,14 @@ class HostapdGlobal:
self.host.send_file(src, dst) self.host.send_file(src, dst)
class Hostapd: class Hostapd:
def __init__(self, ifname, bssidx=0, hostname=None, port=8877): def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl,
port=8877):
self.hostname = hostname self.hostname = hostname
self.host = remotehost.Host(hostname, ifname) self.host = remotehost.Host(hostname, ifname)
self.ifname = ifname self.ifname = ifname
if hostname is None: if hostname is None:
self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname)) self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.dbg = ifname self.dbg = ifname
else: else:
self.ctrl = wpaspy.Ctrl(hostname, port) self.ctrl = wpaspy.Ctrl(hostname, port)
@ -156,6 +162,7 @@ class Hostapd:
self.mon.attach() self.mon.attach()
self.bssid = None self.bssid = None
self.bssidx = bssidx self.bssidx = bssidx
self.mld_addr = None
def cmd_execute(self, cmd_array, shell=False): def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None: if self.hostname is None:
@ -184,8 +191,15 @@ class Hostapd:
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx) self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
return self.bssid return self.bssid
def own_mld_addr(self):
if self.mld_addr is None:
self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
return self.mld_addr
def get_addr(self, group=False): def get_addr(self, group=False):
if self.own_mld_addr() is None:
return self.own_addr() return self.own_addr()
return self.own_mld_addr()
def request(self, cmd): def request(self, cmd):
logger.debug(self.dbg + ": CTRL: " + cmd) logger.debug(self.dbg + ": CTRL: " + cmd)
@ -682,6 +696,37 @@ def add_iface(apdev, confname):
raise Exception("Could not ping hostapd") raise Exception("Could not ping hostapd")
return hapd return hapd
def add_mld_link(apdev, params):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
except:
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
else:
ifname = apdev
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
confname, ctrl_iface = cfg_mld_link_file(ifname, params)
hapd_global.send_file(confname, confname)
try:
hapd_global.add_link(ifname, confname)
except Exception as e:
if str(e) == "Could not add hostapd link":
raise utils.HwsimSkip("No MLO support in hostapd")
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def remove_bss(apdev, ifname=None): def remove_bss(apdev, ifname=None):
if ifname == None: if ifname == None:
ifname = apdev['ifname'] ifname = apdev['ifname']
@ -904,3 +949,32 @@ def cfg_file(apdev, conf, ifname=None):
return fname return fname
return conf return conf
idx = 0
def cfg_mld_link_file(ifname, params):
global idx
ctrl_iface="/var/run/hostapd"
conf = "link-%d.conf" % idx
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
if idx != 0:
ctrl_iface="/var/run/hostapd_%d" % idx
f.write("ctrl_interface=%s\n" % ctrl_iface)
f.write("driver=nl80211\n")
f.write("ieee80211n=1\n")
f.write("ieee80211ac=1\n")
f.write("ieee80211ax=1\n")
f.write("ieee80211be=1\n")
f.write("interface=%s\n" % ifname)
f.write("mld_ap=1\n")
f.write("mld_id=0\n")
for k, v in list(params.items()):
f.write("{}={}\n".format(k,v))
idx = idx + 1
return fname, ctrl_iface

View file

@ -17,6 +17,7 @@ HWSIM_ATTR_CHANNELS = 9
HWSIM_ATTR_RADIO_ID = 10 HWSIM_ATTR_RADIO_ID = 10
HWSIM_ATTR_SUPPORT_P2P_DEVICE = 14 HWSIM_ATTR_SUPPORT_P2P_DEVICE = 14
HWSIM_ATTR_USE_CHANCTX = 15 HWSIM_ATTR_USE_CHANCTX = 15
HWSIM_ATTR_MLO_SUPPORT = 25
# the controller class # the controller class
class HWSimController(object): class HWSimController(object):
@ -25,7 +26,7 @@ class HWSimController(object):
self._fid = netlink.genl_controller.get_family_id(b'MAC80211_HWSIM') self._fid = netlink.genl_controller.get_family_id(b'MAC80211_HWSIM')
def create_radio(self, n_channels=None, use_chanctx=False, def create_radio(self, n_channels=None, use_chanctx=False,
use_p2p_device=False): use_p2p_device=False, use_mlo=False):
attrs = [] attrs = []
if n_channels: if n_channels:
attrs.append(netlink.U32Attr(HWSIM_ATTR_CHANNELS, n_channels)) attrs.append(netlink.U32Attr(HWSIM_ATTR_CHANNELS, n_channels))
@ -33,6 +34,8 @@ class HWSimController(object):
attrs.append(netlink.FlagAttr(HWSIM_ATTR_USE_CHANCTX)) attrs.append(netlink.FlagAttr(HWSIM_ATTR_USE_CHANCTX))
if use_p2p_device: if use_p2p_device:
attrs.append(netlink.FlagAttr(HWSIM_ATTR_SUPPORT_P2P_DEVICE)) attrs.append(netlink.FlagAttr(HWSIM_ATTR_SUPPORT_P2P_DEVICE))
if use_mlo:
attrs.append(netlink.FlagAttr(HWSIM_ATTR_MLO_SUPPORT))
msg = netlink.GenlMessage(self._fid, HWSIM_CMD_CREATE_RADIO, msg = netlink.GenlMessage(self._fid, HWSIM_CMD_CREATE_RADIO,
flags=netlink.NLM_F_REQUEST | flags=netlink.NLM_F_REQUEST |
@ -50,17 +53,19 @@ class HWSimController(object):
class HWSimRadio(object): class HWSimRadio(object):
def __init__(self, n_channels=None, use_chanctx=False, def __init__(self, n_channels=None, use_chanctx=False,
use_p2p_device=False): use_p2p_device=False, use_mlo=False):
self._controller = HWSimController() self._controller = HWSimController()
self._n_channels = n_channels self._n_channels = n_channels
self._use_chanctx = use_chanctx self._use_chanctx = use_chanctx
self._use_p2p_dev = use_p2p_device self._use_p2p_dev = use_p2p_device
self._use_mlo = use_mlo
def __enter__(self): def __enter__(self):
self._radio_id = self._controller.create_radio( self._radio_id = self._controller.create_radio(
n_channels=self._n_channels, n_channels=self._n_channels,
use_chanctx=self._use_chanctx, use_chanctx=self._use_chanctx,
use_p2p_device=self._use_p2p_dev) use_p2p_device=self._use_p2p_dev,
use_mlo=self._use_mlo)
if self._radio_id < 0: if self._radio_id < 0:
raise Exception("Failed to create radio (err:%d)" % self._radio_id) raise Exception("Failed to create radio (err:%d)" % self._radio_id)
try: try:

View file

@ -6,6 +6,59 @@
import hostapd import hostapd
from utils import * from utils import *
from hwsim import HWSimRadio
import hwsim_utils
from wpasupplicant import WpaSupplicant
import re
def eht_verify_wifi_version(dev):
status = dev.get_status()
logger.info("station status: " + str(status))
if 'wifi_generation' not in status:
raise Exception("Missing wifi_generation information")
if status['wifi_generation'] != "7":
raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False,
mld=False):
status = hapd.get_status()
logger.info("hostapd STATUS: " + str(status))
if is_ht and status["ieee80211n"] != "1":
raise Exception("Unexpected STATUS ieee80211n value")
if is_vht and status["ieee80211ac"] != "1":
raise Exception("Unexpected STATUS ieee80211ac value")
if status["ieee80211ax"] != "1":
raise Exception("Unexpected STATUS ieee80211ax value")
if status["ieee80211be"] != "1":
raise Exception("Unexpected STATUS ieee80211be value")
sta = hapd.get_sta(wpas.own_addr())
logger.info("hostapd STA: " + str(sta))
if is_ht and "[HT]" not in sta['flags']:
raise Exception("Missing STA flag: HT")
if is_vht and "[VHT]" not in sta['flags']:
raise Exception("Missing STA flag: VHT")
if "[HE]" not in sta['flags']:
raise Exception("Missing STA flag: HE")
if "[EHT]" not in sta['flags']:
raise Exception("Missing STA flag: EHT")
sig = wpas.request("SIGNAL_POLL").splitlines()
# TODO: With MLD connection, signal poll logic is still not implemented.
# While mac80211 maintains the station using the MLD address, the
# information is maintained in the link stations, but it is not sent to
# user space yet.
if not mld:
if "FREQUENCY=%s" % freq not in sig:
raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
if "WIDTH=%s MHz" % bw not in sig:
raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
def traffic_test(wpas, hapd):
hwsim_utils.test_connectivity(wpas, hapd)
def test_eht_open(dev, apdev): def test_eht_open(dev, apdev):
"""EHT AP with open mode configuration""" """EHT AP with open mode configuration"""
@ -118,3 +171,242 @@ def test_eht_sae_mlo(dev, apdev):
finally: finally:
dev[0].set("sae_groups", "") dev[0].set("sae_groups", "")
dev[0].set("sae_pwe", "0") dev[0].set("sae_pwe", "0")
def eht_mld_enable_ap(iface, params):
hapd = hostapd.add_mld_link(iface, params)
hapd.enable()
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1)
if ev is None:
raise Exception("AP startup timed out")
if "AP-ENABLED" not in ev:
raise Exception("AP startup failed")
return hapd
def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256",
mfp="2", pwe=None, beacon_prot="1"):
params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
wpa_key_mgmt=key_mgmt, ieee80211w=mfp)
params['ieee80211n'] = '1'
params['ieee80211ax'] = '1'
params['ieee80211be'] = '1'
params['channel'] = '1'
params['hw_mode'] = 'g'
params['group_mgmt_cipher'] = "AES-128-CMAC"
params['beacon_prot'] = beacon_prot
if pwe is not None:
params['sae_pwe'] = pwe
return params
def test_eht_mld_discovery(dev, apdev):
"""EHT MLD AP discovery"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
ssid = "mld_ap"
link0_params = {"ssid": ssid,
"hw_mode": "g",
"channel": "1"}
link1_params = {"ssid": ssid,
"hw_mode": "g",
"channel": "2"}
hapd0 = eht_mld_enable_ap(hapd_iface, link0_params)
hapd1 = eht_mld_enable_ap(hapd_iface, link1_params)
res = wpas.request("SCAN freq=2412,2417")
if "FAIL" in res:
raise Exception("Failed to start scan")
ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
if ev is None:
raise Exception("Scan did not start")
ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
if ev is None:
raise Exception("Scan did not complete")
logger.info("Scan done")
rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
re.MULTILINE)
ml_pattern = re.compile(".*multi-link:.*, MLD ID=0x0", re.MULTILINE)
bss = wpas.request("BSS " + hapd0.own_addr())
logger.info("BSS 0: " + str(bss))
if rnr_pattern.search(bss) is None:
raise Exception("RNR element not found for first link")
if ml_pattern.search(bss) is None:
raise Exception("ML element not found for first link")
bss = wpas.request("BSS " + hapd1.own_addr())
logger.info("BSS 1: " + str(bss))
if rnr_pattern.search(bss) is None:
raise Exception("RNR element not found for second link")
if ml_pattern.search(bss) is None:
raise Exception("ML element not found for second link")
def test_eht_mld_owe_two_links(dev, apdev):
"""EHT MLD AP with MLD client OWE connection using two links"""
with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
ssid = "mld_ap_owe_two_link"
params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
hapd0 = eht_mld_enable_ap(hapd0_iface, params)
params['channel'] = '6'
hapd1 = eht_mld_enable_ap(hapd0_iface, params)
# Check legacy client connection
dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2")
wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
ieee80211w="2")
eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
eht_verify_wifi_version(wpas)
traffic_test(wpas, hapd0)
traffic_test(wpas, hapd1)
def test_eht_mld_sae_single_link(dev, apdev):
"""EHT MLD AP with MLD client SAE H2E connection using single link"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
passphrase = 'qwertyuiop'
ssid = "mld_ap_sae_single_link"
params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE",
mfp="2", pwe='2')
hapd0 = eht_mld_enable_ap(hapd_iface, params)
wpas.set("sae_pwe", "1")
wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
key_mgmt="SAE", ieee80211w="2")
eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
eht_verify_wifi_version(wpas)
traffic_test(wpas, hapd0)
def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1"):
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
passphrase = 'qwertyuiop'
ssid = "mld_ap_sae_two_link"
params = eht_mld_ap_wpa2_params(ssid, passphrase,
key_mgmt="SAE", mfp="2", pwe='1',
beacon_prot=beacon_prot)
hapd0 = eht_mld_enable_ap(hapd_iface, params)
params['channel'] = '6'
hapd1 = eht_mld_enable_ap(hapd_iface, params)
wpas.set("sae_pwe", "1")
wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
eht_verify_wifi_version(wpas)
traffic_test(wpas, hapd0)
traffic_test(wpas, hapd1)
def test_eht_mld_sae_two_links(dev, apdev):
"""EHT MLD AP with MLD client SAE H2E connection using two links"""
run_eht_mld_sae_two_links(dev, apdev)
def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev):
"""EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection"""
run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0")
def test_eht_mld_sae_ext_one_link(dev, apdev):
"""EHT MLD AP with MLD client SAE-EXT H2E connection using single link"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
passphrase = 'qwertyuiop'
ssid = "mld_ap_sae_ext_single_link"
params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")
hapd0 = eht_mld_enable_ap(hapd_iface, params)
wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
key_mgmt="SAE-EXT-KEY", ieee80211w="2")
eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
eht_verify_wifi_version(wpas)
traffic_test(wpas, hapd0)
def test_eht_mld_sae_ext_two_links(dev, apdev):
"""EHT MLD AP with MLD client SAE-EXT H2E connection using two links"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add(wpas_iface)
passphrase = 'qwertyuiop'
ssid = "mld_ap_sae_two_link"
params = eht_mld_ap_wpa2_params(ssid, passphrase,
key_mgmt="SAE-EXT-KEY")
hapd0 = eht_mld_enable_ap(hapd_iface, params)
params['channel'] = '6'
hapd1 = eht_mld_enable_ap(hapd_iface, params)
wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
key_mgmt="SAE-EXT-KEY", ieee80211w="2")
eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
eht_verify_wifi_version(wpas)
traffic_test(wpas, hapd0)
traffic_test(wpas, hapd1)
def test_eht_mld_sae_legacy_client(dev, apdev):
"""EHT MLD AP with legacy client SAE H2E connection"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
passphrase = 'qwertyuiop'
ssid = "mld_ap_sae_two_link"
params = eht_mld_ap_wpa2_params(ssid, passphrase,
key_mgmt="SAE", mfp="2", pwe='1')
hapd0 = eht_mld_enable_ap(hapd_iface, params)
params['channel'] = '6'
hapd1 = eht_mld_enable_ap(hapd_iface, params)
dev[0].set("sae_groups", "")
dev[0].set("sae_pwe", "1")
dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True)
traffic_test(dev[0], hapd0)