hostapd/tests/hwsim/hostapd.py
Aditya Kumar Singh 757d8d9aac tests: MLO: Use link ID to access control sockets
With MLO, each BSS will create sockets under the given ctrl_iface
directory with the socket name being '<ifname>_link<link_ID>'.

Make necessary changes in MLO related test cases so that it can access
the new socket and proceed further as expected.

Signed-off-by: Aditya Kumar Singh <quic_adisi@quicinc.com>
2024-09-01 11:17:11 +03:00

1090 lines
36 KiB
Python

# Python class for controlling hostapd
# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import re
import time
import logging
import binascii
import struct
import tempfile
import wpaspy
import remotehost
import utils
import subprocess
from remotectrl import RemoteCtrl
logger = logging.getLogger()
hapd_ctrl = '/var/run/hostapd'
hapd_global = '/var/run/hostapd-global'
def mac2tuple(mac):
return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
class HostapdGlobal:
def __init__(self, apdev=None, global_ctrl_override=None):
try:
hostname = apdev['hostname']
port = apdev['port']
if 'remote_cli' in apdev:
remote_cli = apdev['remote_cli']
else:
remote_cli = False
except:
hostname = None
port = 8878
remote_cli = False
self.host = remotehost.Host(hostname)
self.hostname = hostname
self.port = port
self.remote_cli = remote_cli
if hostname is None:
global_ctrl = hapd_global
if global_ctrl_override:
global_ctrl = global_ctrl_override
self.ctrl = wpaspy.Ctrl(global_ctrl)
self.mon = wpaspy.Ctrl(global_ctrl)
self.dbg = ""
else:
if remote_cli:
global_ctrl = hapd_global
if global_ctrl_override:
global_ctrl = global_ctrl_override
self.ctrl = RemoteCtrl(global_ctrl, port, hostname=hostname)
self.mon = RemoteCtrl(global_ctrl, port, hostname=hostname)
self.dbg = hostname + "/global"
else:
self.ctrl = wpaspy.Ctrl(hostname, port)
self.mon = wpaspy.Ctrl(hostname, port)
self.dbg = hostname + "/" + str(port)
self.mon.attach()
def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None:
if shell:
cmd = ' '.join(cmd_array)
else:
cmd = cmd_array
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
stdout=subprocess.PIPE, shell=shell)
out = proc.communicate()[0]
ret = proc.returncode
return ret, out.decode()
else:
return self.host.execute(cmd_array)
def request(self, cmd, timeout=10):
logger.debug(self.dbg + ": CTRL(global): " + cmd)
return self.ctrl.request(cmd, timeout)
def wait_event(self, events, timeout):
start = os.times()[4]
while True:
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + "(global): " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.mon.pending(timeout=remaining):
break
return None
def add(self, ifname, driver=None):
cmd = "ADD " + ifname + " " + hapd_ctrl
if driver:
cmd += " " + driver
res = self.request(cmd)
if "OK" not in res:
raise Exception("Could not add hostapd interface " + ifname)
def add_iface(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
if "OK" not in res:
raise Exception("Could not add hostapd interface")
def add_bss(self, phy, confname, ignore_error=False):
res = self.request("ADD bss_config=" + phy + ":" + confname)
if "OK" not in res:
if not ignore_error:
raise Exception("Could not add hostapd BSS")
def add_link(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
if "OK" not in res:
raise Exception("Could not add hostapd link")
def remove(self, ifname):
self.request("REMOVE " + ifname, timeout=30)
def relog(self):
self.request("RELOG")
def flush(self):
self.request("FLUSH")
def get_ctrl_iface_port(self, ifname):
if self.hostname is None:
return None
if self.remote_cli:
return None
res = self.request("INTERFACES ctrl")
lines = res.splitlines()
found = False
for line in lines:
words = line.split()
if words[0] == ifname:
found = True
break
if not found:
raise Exception("Could not find UDP port for " + ifname)
res = line.find("ctrl_iface=udp:")
if res == -1:
raise Exception("Wrong ctrl_interface format")
words = line.split(":")
return int(words[1])
def terminate(self):
self.mon.detach()
self.mon.close()
self.mon = None
self.ctrl.terminate()
self.ctrl = None
def send_file(self, src, dst):
self.host.send_file(src, dst)
class Hostapd:
def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl,
port=8877, remote_cli=False, link=None):
self.hostname = hostname
self.host = remotehost.Host(hostname, ifname)
self.ifname = ifname
self.remote_cli = remote_cli
if hostname is None:
if link is not None:
ifname = ifname + "_link" + str(link)
self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.dbg = ifname
else:
if remote_cli:
self.ctrl = RemoteCtrl(ctrl, port, hostname=hostname,
ifname=ifname)
self.mon = RemoteCtrl(ctrl, port, hostname=hostname,
ifname=ifname)
else:
self.ctrl = wpaspy.Ctrl(hostname, port)
self.mon = wpaspy.Ctrl(hostname, port)
self.dbg = hostname + "/" + ifname
self.mon.attach()
self.bssid = None
self.bssidx = bssidx
self.mld_addr = None
def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None:
if shell:
cmd = ' '.join(cmd_array)
else:
cmd = cmd_array
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
stdout=subprocess.PIPE, shell=shell)
out = proc.communicate()[0]
ret = proc.returncode
return ret, out.decode()
else:
return self.host.execute(cmd_array)
def close_ctrl(self):
if self.mon is not None:
self.mon.detach()
self.mon.close()
self.mon = None
self.ctrl.close()
self.ctrl = None
def own_addr(self):
if self.bssid is None:
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
return self.bssid
def own_mld_addr(self):
if self.mld_addr is None:
self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
return self.mld_addr
def get_addr(self, group=False):
if self.own_mld_addr() is None:
return self.own_addr()
return self.own_mld_addr()
def request(self, cmd):
logger.debug(self.dbg + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
def ping(self):
return "PONG" in self.request("PING")
def set(self, field, value):
if "OK" not in self.request("SET " + field + " " + value):
if "TKIP" in value and (field == "wpa_pairwise" or \
field == "rsn_pairwise"):
raise utils.HwsimSkip("Cipher TKIP not supported")
raise Exception("Failed to set hostapd parameter " + field)
def set_defaults(self, set_channel=True):
self.set("driver", "nl80211")
if set_channel:
self.set("hw_mode", "g")
self.set("channel", "1")
self.set("ieee80211n", "1")
self.set("logger_stdout", "-1")
self.set("logger_stdout_level", "0")
def set_open(self, ssid):
self.set_defaults()
self.set("ssid", ssid)
def set_wpa2_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "2")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("rsn_pairwise", "CCMP")
def set_wpa_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "1")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
def set_wpa_psk_mixed(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "3")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
self.set("rsn_pairwise", "CCMP")
def set_wep(self, ssid, key):
self.set_defaults()
self.set("ssid", ssid)
self.set("wep_key0", key)
def enable(self):
if "OK" not in self.request("ENABLE"):
raise Exception("Failed to enable hostapd interface " + self.ifname)
def disable(self):
if "OK" not in self.request("DISABLE"):
raise Exception("Failed to disable hostapd interface " + self.ifname)
def link_remove(self, count=10):
if "OK" not in self.request("LINK_REMOVE %u" % count):
raise Exception("Failed to remove hostapd link " + self.ifname)
def dump_monitor(self):
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + ": " + ev)
def wait_event(self, events, timeout):
if not isinstance(events, list):
raise Exception("Hostapd.wait_event() called with incorrect events argument type")
start = os.times()[4]
while True:
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + ": " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.mon.pending(timeout=remaining):
break
return None
def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False):
ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
if ev is None:
raise Exception("AP did not report STA connection")
if addr and addr not in ev:
raise Exception("Unexpected STA address in connection event: " + ev)
if wait_4way_hs:
ev2 = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"],
timeout=timeout)
if ev2 is None:
raise Exception("AP did not report 4-way handshake completion")
if addr and addr not in ev2:
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2)
return ev
def wait_sta_disconnect(self, addr=None, timeout=2):
ev = self.wait_event(["AP-STA-DISCONNECT"], timeout=timeout)
if ev is None:
raise Exception("AP did not report STA disconnection")
if addr and addr not in ev:
raise Exception("Unexpected STA address in disconnection event: " + ev)
return ev
def wait_4way_hs(self, addr=None, timeout=1):
ev = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=timeout)
if ev is None:
raise Exception("hostapd did not report 4-way handshake completion")
if addr and addr not in ev:
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev)
return ev
def wait_ptkinitdone(self, addr, timeout=2):
while timeout > 0:
sta = self.get_sta(addr)
if 'hostapdWPAPTKState' not in sta:
raise Exception("GET_STA did not return hostapdWPAPTKState")
state = sta['hostapdWPAPTKState']
if state == "11":
return
time.sleep(0.1)
timeout -= 0.1
raise Exception("Timeout while waiting for PTKINITDONE")
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_status_field(self, field):
vals = self.get_status()
if field in vals:
return vals[field]
return None
def get_driver_status(self):
res = self.request("STATUS-DRIVER")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_driver_status_field(self, field):
vals = self.get_driver_status()
if field in vals:
return vals[field]
return None
def get_config(self):
res = self.request("GET_CONFIG")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def mgmt_rx(self, timeout=5):
ev = self.wait_event(["MGMT-RX"], timeout=timeout)
if ev is None:
return None
msg = {}
frame = binascii.unhexlify(ev.split(' ')[1])
msg['frame'] = frame
hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
msg['fc'] = hdr[0]
msg['subtype'] = (hdr[0] >> 4) & 0xf
hdr = hdr[1:]
msg['duration'] = hdr[0]
hdr = hdr[1:]
msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['seq_ctrl'] = hdr[0]
msg['payload'] = frame[24:]
return msg
def mgmt_tx(self, msg):
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
hdr = struct.pack('<HH6B6B6BH', *t)
res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
if "OK" not in res:
raise Exception("MGMT_TX command to hostapd failed")
def get_sta(self, addr, info=None, next=False):
cmd = "STA-NEXT " if next else "STA "
if addr is None:
res = self.request("STA-FIRST")
elif info:
res = self.request(cmd + addr + " " + info)
else:
res = self.request(cmd + addr)
lines = res.splitlines()
vals = dict()
first = True
for l in lines:
if first and '=' not in l:
vals['addr'] = l
first = False
else:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_mib(self, param=None):
if param:
res = self.request("MIB " + param)
else:
res = self.request("MIB")
lines = res.splitlines()
vals = dict()
for l in lines:
name_val = l.split('=', 1)
if len(name_val) > 1:
vals[name_val[0]] = name_val[1]
return vals
def get_pmksa(self, addr):
res = self.request("PMKSA")
lines = res.splitlines()
for l in lines:
if addr not in l:
continue
vals = dict()
[index, aa, pmkid, expiration, opportunistic] = l.split(' ')
vals['index'] = index
vals['pmkid'] = pmkid
vals['expiration'] = expiration
vals['opportunistic'] = opportunistic
return vals
return None
def dpp_qr_code(self, uri):
res = self.request("DPP_QR_CODE " + uri)
if "FAIL" in res:
raise Exception("Failed to parse QR Code URI")
return int(res)
def dpp_nfc_uri(self, uri):
res = self.request("DPP_NFC_URI " + uri)
if "FAIL" in res:
raise Exception("Failed to parse NFC URI")
return int(res)
def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
curve=None, key=None, supported_curves=None,
host=None):
cmd = "DPP_BOOTSTRAP_GEN type=" + type
if chan:
cmd += " chan=" + chan
if mac:
if mac is True:
mac = self.own_addr()
cmd += " mac=" + mac.replace(':', '')
if info:
cmd += " info=" + info
if curve:
cmd += " curve=" + curve
if key:
cmd += " key=" + key
if supported_curves:
cmd += " supported_curves=" + supported_curves
if host:
cmd += " host=" + host
res = self.request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
return int(res)
def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None,
extra=None):
cmd = "DPP_BOOTSTRAP_SET %d" % id
if ssid:
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
if extra:
cmd += " " + extra
if conf:
cmd += " conf=" + conf
if configurator is not None:
cmd += " configurator=%d" % configurator
if "OK" not in self.request(cmd):
raise Exception("Failed to set bootstrapping parameters")
def dpp_listen(self, freq, netrole=None, qr=None, role=None):
cmd = "DPP_LISTEN " + str(freq)
if netrole:
cmd += " netrole=" + netrole
if qr:
cmd += " qr=" + qr
if role:
cmd += " role=" + role
if "OK" not in self.request(cmd):
raise Exception("Failed to start listen operation")
def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
extra=None, own=None, role=None, neg_freq=None,
ssid=None, passphrase=None, expect_fail=False,
conn_status=False, nfc_uri=None):
cmd = "DPP_AUTH_INIT"
if peer is None:
if nfc_uri:
peer = self.dpp_nfc_uri(nfc_uri)
else:
peer = self.dpp_qr_code(uri)
cmd += " peer=%d" % peer
if own is not None:
cmd += " own=%d" % own
if role:
cmd += " role=" + role
if extra:
cmd += " " + extra
if conf:
cmd += " conf=" + conf
if configurator is not None:
cmd += " configurator=%d" % configurator
if neg_freq:
cmd += " neg_freq=%d" % neg_freq
if ssid:
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
if passphrase:
cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
if conn_status:
cmd += " conn_status=1"
res = self.request(cmd)
if expect_fail:
if "FAIL" not in res:
raise Exception("DPP authentication started unexpectedly")
return
if "OK" not in res:
raise Exception("Failed to initiate DPP Authentication")
def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
extra=None, use_id=None, ver=None):
if use_id is None:
id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
else:
id1 = use_id
cmd = "own=%d " % id1
if identifier:
cmd += "identifier=%s " % identifier
cmd += "init=1 "
if ver is not None:
cmd += "ver=" + str(ver) + " "
if role:
cmd += "role=%s " % role
if extra:
cmd += extra + " "
cmd += "code=%s" % code
res = self.request("DPP_PKEX_ADD " + cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (initiator)")
return id1
def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
listen_role=None):
id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
cmd = "own=%d " % id0
if identifier:
cmd += "identifier=%s " % identifier
cmd += "code=%s" % code
res = self.request("DPP_PKEX_ADD " + cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
self.dpp_listen(freq, role=listen_role)
def dpp_configurator_add(self, curve=None, key=None,
net_access_key_curve=None):
cmd = "DPP_CONFIGURATOR_ADD"
if curve:
cmd += " curve=" + curve
if net_access_key_curve:
cmd += " net_access_key_curve=" + curve
if key:
cmd += " key=" + key
res = self.request(cmd)
if "FAIL" in res:
raise Exception("Failed to add configurator")
return int(res)
def dpp_configurator_remove(self, conf_id):
res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
if "OK" not in res:
raise Exception("DPP_CONFIGURATOR_REMOVE failed")
def note(self, txt):
self.request("NOTE " + txt)
def send_file(self, src, dst):
self.host.send_file(src, dst)
def get_ptksa(self, bssid, cipher):
res = self.request("PTKSA_CACHE_LIST")
lines = res.splitlines()
for l in lines:
if bssid not in l or cipher not in l:
continue
vals = dict()
[index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5)
vals['index'] = index
vals['addr'] = addr
vals['cipher'] = cipher
vals['expiration'] = expiration
vals['tk'] = tk
vals['kdk'] = kdk
return vals
return None
def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
global_ctrl_override=None, driver=False, set_channel=True):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
if 'remote_cli' in apdev:
remote_cli = apdev['remote_cli']
else:
remote_cli = False
if 'global_ctrl_override' in apdev:
global_ctrl_override = apdev['global_ctrl_override']
logger.info("Starting AP " + hostname + "/" + port + " " + ifname + " remote_cli " + str(remote_cli))
except:
logger.info("Starting AP " + ifname)
hostname = None
port = 8878
remote_cli = False
else:
ifname = apdev
logger.info("Starting AP " + ifname + " (old add_ap argument type)")
hostname = None
port = 8878
remote_cli = False
hapd_global = HostapdGlobal(apdev,
global_ctrl_override=global_ctrl_override)
hapd_global.remove(ifname)
hapd_global.add(ifname, driver=driver)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port,
remote_cli=remote_cli)
if not hapd.ping():
raise Exception("Could not ping hostapd")
hapd.set_defaults(set_channel=set_channel)
fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
"wpa", "wpa_deny_ptk0_rekey",
"wpa_pairwise", "rsn_pairwise", "auth_server_addr",
"acct_server_addr", "osu_server_uri"]
for field in fields:
if field in params:
hapd.set(field, params[field])
for f, v in list(params.items()):
if f in fields:
continue
if isinstance(v, list):
for val in v:
hapd.set(f, val)
else:
hapd.set(f, v)
if no_enable:
return hapd
hapd.enable()
if wait_enabled:
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
if ev is None:
raise Exception("AP startup timed out")
if "AP-ENABLED" not in ev:
raise Exception("AP startup failed")
return hapd
def add_bss(apdev, ifname, confname, ignore_error=False):
phy = utils.get_phy(apdev)
try:
hostname = apdev['hostname']
port = apdev['port']
if 'remote_cli' in apdev:
remote_cli = apdev['remote_cli']
else:
remote_cli = False
logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname + " remote_cli=" + str(remote_cli))
except:
logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
hostname = None
port = 8878
remote_cli = False
hapd_global = HostapdGlobal(apdev)
confname = cfg_file(apdev, confname, ifname)
hapd_global.send_file(confname, confname)
hapd_global.add_bss(phy, confname, ignore_error)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port, remote_cli=remote_cli)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def add_iface(apdev, confname):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
if 'remote_cli' in apdev:
remote_cli = apdev['remote_cli']
else:
remote_cli = False
logger.info("Starting interface " + hostname + "/" + port + " " + ifname + "remote_cli=" + str(remote_cli))
except:
logger.info("Starting interface " + ifname)
hostname = None
port = 8878
remote_cli = False
hapd_global = HostapdGlobal(apdev)
confname = cfg_file(apdev, confname, ifname)
hapd_global.send_file(confname, confname)
hapd_global.add_iface(ifname, confname)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port, remote_cli=remote_cli)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def add_mld_link(apdev, link_id, params):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
except:
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
else:
ifname = apdev
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
confname, ctrl_iface = cfg_mld_link_file(ifname, params)
hapd_global.send_file(confname, confname)
try:
hapd_global.add_link(ifname, confname)
except Exception as e:
if str(e) == "Could not add hostapd link":
raise utils.HwsimSkip("No MLO support in hostapd")
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port,
link=link_id)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def remove_bss(apdev, ifname=None):
if ifname == None:
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
except:
logger.info("Removing BSS " + ifname)
hapd_global = HostapdGlobal(apdev)
hapd_global.remove(ifname)
# wait little to make sure the AP stops beaconing
time.sleep(0.1)
def terminate(apdev):
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Terminating hostapd " + hostname + "/" + port)
except:
logger.info("Terminating hostapd")
hapd_global = HostapdGlobal(apdev)
hapd_global.terminate()
def wpa3_params(ssid=None, password=None, wpa_key_mgmt="SAE",
ieee80211w="2"):
params = {"wpa": "2",
"wpa_key_mgmt": wpa_key_mgmt,
"ieee80211w": ieee80211w,
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if password:
params["sae_password"] = password
return params
def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
ieee80211w=None):
params = {"wpa": "2",
"wpa_key_mgmt": wpa_key_mgmt,
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
if ieee80211w is not None:
params["ieee80211w"] = ieee80211w
return params
def wpa_params(ssid=None, passphrase=None):
params = {"wpa": "1",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def wpa_mixed_params(ssid=None, passphrase=None):
params = {"wpa": "3",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP",
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def radius_params():
params = {"auth_server_addr": "127.0.0.1",
"auth_server_port": "1812",
"auth_server_shared_secret": "radius",
"nas_identifier": "nas.w1.fi"}
return params
def wpa_eap_params(ssid=None):
params = radius_params()
params["wpa"] = "1"
params["wpa_key_mgmt"] = "WPA-EAP"
params["wpa_pairwise"] = "TKIP"
params["ieee8021x"] = "1"
if ssid:
params["ssid"] = ssid
return params
def wpa2_eap_params(ssid=None):
params = radius_params()
params["wpa"] = "2"
params["wpa_key_mgmt"] = "WPA-EAP"
params["rsn_pairwise"] = "CCMP"
params["ieee8021x"] = "1"
if ssid:
params["ssid"] = ssid
return params
def b_only_params(channel="1", ssid=None, country=None):
params = {"hw_mode": "b",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def g_only_params(channel="1", ssid=None, country=None):
params = {"hw_mode": "g",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def a_only_params(channel="36", ssid=None, country=None):
params = {"hw_mode": "a",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def ht20_params(channel="1", ssid=None, country=None):
params = {"ieee80211n": "1",
"channel": channel,
"hw_mode": "g"}
if int(channel) > 14:
params["hw_mode"] = "a"
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def ht40_plus_params(channel="1", ssid=None, country=None):
params = ht20_params(channel, ssid, country)
params['ht_capab'] = "[HT40+]"
return params
def ht40_minus_params(channel="1", ssid=None, country=None):
params = ht20_params(channel, ssid, country)
params['ht_capab'] = "[HT40-]"
return params
def he_params(ssid=None):
params = {"ssid": "he6ghz",
"ieee80211n": "1",
"ieee80211ac": "1",
"wmm_enabled": "1",
"channel": "5",
"op_class": "131",
"ieee80211ax": "1",
"hw_mode": "a",
"he_oper_centr_freq_seg0_idx": "15",
"he_oper_chwidth": "2",
"vht_oper_chwidth": "2"}
if ssid:
params["ssid"] = ssid
return params
def he_wpa2_params(ssid=None, wpa_key_mgmt="SAE", rsn_pairwise="CCMP",
group_cipher="CCMP", sae_pwe="1", passphrase=None):
params = he_params(ssid)
params["wpa"] = "2"
params["wpa_key_mgmt"] = wpa_key_mgmt
params["rsn_pairwise"] = rsn_pairwise
params["group_cipher"] = group_cipher
params["ieee80211w"] = "2"
if "SAE" in wpa_key_mgmt:
params["sae_pwe"] = sae_pwe
params["sae_groups"] = "19"
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def cmd_execute(apdev, cmd, shell=False):
hapd_global = HostapdGlobal(apdev)
return hapd_global.cmd_execute(cmd, shell=shell)
def send_file(apdev, src, dst):
hapd_global = HostapdGlobal(apdev)
return hapd_global.send_file(src, dst)
def acl_file(dev, apdev, conf):
fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
if conf == 'hostapd.macaddr':
mac0 = dev[0].get_status_field("address")
f.write(mac0 + '\n')
f.write("02:00:00:00:00:12\n")
f.write("02:00:00:00:00:34\n")
f.write("-02:00:00:00:00:12\n")
f.write("-02:00:00:00:00:34\n")
f.write("01:01:01:01:01:01\n")
f.write("03:01:01:01:01:03\n")
elif conf == 'hostapd.accept':
mac0 = dev[0].get_status_field("address")
mac1 = dev[1].get_status_field("address")
f.write(mac0 + " 1\n")
f.write(mac1 + " 2\n")
elif conf == 'hostapd.accept2':
mac0 = dev[0].get_status_field("address")
mac1 = dev[1].get_status_field("address")
mac2 = dev[2].get_status_field("address")
f.write(mac0 + " 1\n")
f.write(mac1 + " 2\n")
f.write(mac2 + " 3\n")
else:
f.close()
os.unlink(filename)
return conf
return filename
def bssid_inc(apdev, inc=1):
parts = apdev['bssid'].split(':')
parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
parts[3], parts[4], parts[5])
return bssid
def cfg_file(apdev, conf, ifname=None):
match = re.search(r'^bss-.+', conf)
if match:
# put cfg file in /tmp directory
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
if ifname is None:
ifname = apdev['ifname']
if idx != '1':
ifname = ifname + '-' + idx
f.write("driver=nl80211\n")
f.write("ctrl_interface=/var/run/hostapd\n")
f.write("hw_mode=g\n")
f.write("channel=1\n")
f.write("ieee80211n=1\n")
if conf.startswith('bss-ht40-'):
f.write("ht_capab=[HT40+]\n")
f.write("interface=%s\n" % ifname)
f.write("ssid=bss-%s\n" % idx)
if conf == 'bss-2-dup.conf':
bssid = apdev['bssid']
else:
bssid = bssid_inc(apdev, int(idx) - 1)
f.write("bssid=%s\n" % bssid)
return fname
return conf
idx = 0
def cfg_mld_link_file(ifname, params):
global idx
ctrl_iface="/var/run/hostapd"
conf = "link-%d.conf" % idx
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
f.write("ctrl_interface=%s\n" % ctrl_iface)
f.write("driver=nl80211\n")
f.write("ieee80211n=1\n")
if 'hw_mode' in params and params['hw_mode'] == 'a' and \
('op_class' not in params or \
int(params['op_class']) not in [131, 132, 133, 134, 135, 136, 137]):
f.write("ieee80211ac=1\n")
f.write("ieee80211ax=1\n")
f.write("ieee80211be=1\n")
f.write("interface=%s\n" % ifname)
f.write("mld_ap=1\n")
for k, v in list(params.items()):
f.write("{}={}\n".format(k,v))
idx = idx + 1
return fname, ctrl_iface