hostapd/tests/hwsim/hostapd.py
Jouni Malinen 9579d4eff9 tests: Wait for AP/GO event in addition to STA
Wait for AP/GO to complete processing before taking the next step in a
test instead of waiting just for STA. This avoids race conditions with
UML time-travel.

Signed-off-by: Jouni Malinen <j@w1.fi>
2023-12-10 19:40:49 +02:00

1007 lines
33 KiB
Python

# Python class for controlling hostapd
# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import re
import time
import logging
import binascii
import struct
import tempfile
import wpaspy
import remotehost
import utils
import subprocess
logger = logging.getLogger()
hapd_ctrl = '/var/run/hostapd'
hapd_global = '/var/run/hostapd-global'
def mac2tuple(mac):
return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
class HostapdGlobal:
def __init__(self, apdev=None, global_ctrl_override=None):
try:
hostname = apdev['hostname']
port = apdev['port']
except:
hostname = None
port = 8878
self.host = remotehost.Host(hostname)
self.hostname = hostname
self.port = port
if hostname is None:
global_ctrl = hapd_global
if global_ctrl_override:
global_ctrl = global_ctrl_override
self.ctrl = wpaspy.Ctrl(global_ctrl)
self.mon = wpaspy.Ctrl(global_ctrl)
self.dbg = ""
else:
self.ctrl = wpaspy.Ctrl(hostname, port)
self.mon = wpaspy.Ctrl(hostname, port)
self.dbg = hostname + "/" + str(port)
self.mon.attach()
def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None:
if shell:
cmd = ' '.join(cmd_array)
else:
cmd = cmd_array
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
stdout=subprocess.PIPE, shell=shell)
out = proc.communicate()[0]
ret = proc.returncode
return ret, out.decode()
else:
return self.host.execute(cmd_array)
def request(self, cmd, timeout=10):
logger.debug(self.dbg + ": CTRL(global): " + cmd)
return self.ctrl.request(cmd, timeout)
def wait_event(self, events, timeout):
start = os.times()[4]
while True:
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + "(global): " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.mon.pending(timeout=remaining):
break
return None
def add(self, ifname, driver=None):
cmd = "ADD " + ifname + " " + hapd_ctrl
if driver:
cmd += " " + driver
res = self.request(cmd)
if "OK" not in res:
raise Exception("Could not add hostapd interface " + ifname)
def add_iface(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
if "OK" not in res:
raise Exception("Could not add hostapd interface")
def add_bss(self, phy, confname, ignore_error=False):
res = self.request("ADD bss_config=" + phy + ":" + confname)
if "OK" not in res:
if not ignore_error:
raise Exception("Could not add hostapd BSS")
def add_link(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
if "OK" not in res:
raise Exception("Could not add hostapd link")
def remove(self, ifname):
self.request("REMOVE " + ifname, timeout=30)
def relog(self):
self.request("RELOG")
def flush(self):
self.request("FLUSH")
def get_ctrl_iface_port(self, ifname):
if self.hostname is None:
return None
res = self.request("INTERFACES ctrl")
lines = res.splitlines()
found = False
for line in lines:
words = line.split()
if words[0] == ifname:
found = True
break
if not found:
raise Exception("Could not find UDP port for " + ifname)
res = line.find("ctrl_iface=udp:")
if res == -1:
raise Exception("Wrong ctrl_interface format")
words = line.split(":")
return int(words[1])
def terminate(self):
self.mon.detach()
self.mon.close()
self.mon = None
self.ctrl.terminate()
self.ctrl = None
def send_file(self, src, dst):
self.host.send_file(src, dst)
class Hostapd:
def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl,
port=8877):
self.hostname = hostname
self.host = remotehost.Host(hostname, ifname)
self.ifname = ifname
if hostname is None:
self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
self.dbg = ifname
else:
self.ctrl = wpaspy.Ctrl(hostname, port)
self.mon = wpaspy.Ctrl(hostname, port)
self.dbg = hostname + "/" + ifname
self.mon.attach()
self.bssid = None
self.bssidx = bssidx
self.mld_addr = None
def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None:
if shell:
cmd = ' '.join(cmd_array)
else:
cmd = cmd_array
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
stdout=subprocess.PIPE, shell=shell)
out = proc.communicate()[0]
ret = proc.returncode
return ret, out.decode()
else:
return self.host.execute(cmd_array)
def close_ctrl(self):
if self.mon is not None:
self.mon.detach()
self.mon.close()
self.mon = None
self.ctrl.close()
self.ctrl = None
def own_addr(self):
if self.bssid is None:
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
return self.bssid
def own_mld_addr(self):
if self.mld_addr is None:
self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
return self.mld_addr
def get_addr(self, group=False):
if self.own_mld_addr() is None:
return self.own_addr()
return self.own_mld_addr()
def request(self, cmd):
logger.debug(self.dbg + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
def ping(self):
return "PONG" in self.request("PING")
def set(self, field, value):
if "OK" not in self.request("SET " + field + " " + value):
if "TKIP" in value and (field == "wpa_pairwise" or \
field == "rsn_pairwise"):
raise utils.HwsimSkip("Cipher TKIP not supported")
raise Exception("Failed to set hostapd parameter " + field)
def set_defaults(self, set_channel=True):
self.set("driver", "nl80211")
if set_channel:
self.set("hw_mode", "g")
self.set("channel", "1")
self.set("ieee80211n", "1")
self.set("logger_stdout", "-1")
self.set("logger_stdout_level", "0")
def set_open(self, ssid):
self.set_defaults()
self.set("ssid", ssid)
def set_wpa2_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "2")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("rsn_pairwise", "CCMP")
def set_wpa_psk(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "1")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
def set_wpa_psk_mixed(self, ssid, passphrase):
self.set_defaults()
self.set("ssid", ssid)
self.set("wpa_passphrase", passphrase)
self.set("wpa", "3")
self.set("wpa_key_mgmt", "WPA-PSK")
self.set("wpa_pairwise", "TKIP")
self.set("rsn_pairwise", "CCMP")
def set_wep(self, ssid, key):
self.set_defaults()
self.set("ssid", ssid)
self.set("wep_key0", key)
def enable(self):
if "OK" not in self.request("ENABLE"):
raise Exception("Failed to enable hostapd interface " + self.ifname)
def disable(self):
if "OK" not in self.request("DISABLE"):
raise Exception("Failed to disable hostapd interface " + self.ifname)
def link_remove(self, count=10):
if "OK" not in self.request("LINK_REMOVE %u" % count):
raise Exception("Failed to remove hostapd link " + self.ifname)
def dump_monitor(self):
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + ": " + ev)
def wait_event(self, events, timeout):
if not isinstance(events, list):
raise Exception("Hostapd.wait_event() called with incorrect events argument type")
start = os.times()[4]
while True:
while self.mon.pending():
ev = self.mon.recv()
logger.debug(self.dbg + ": " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.mon.pending(timeout=remaining):
break
return None
def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False):
ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
if ev is None:
raise Exception("AP did not report STA connection")
if addr and addr not in ev:
raise Exception("Unexpected STA address in connection event: " + ev)
if wait_4way_hs:
ev2 = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"],
timeout=timeout)
if ev2 is None:
raise Exception("AP did not report 4-way handshake completion")
if addr and addr not in ev2:
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2)
return ev
def wait_sta_disconnect(self, addr=None, timeout=2):
ev = self.wait_event(["AP-STA-DISCONNECT"], timeout=timeout)
if ev is None:
raise Exception("AP did not report STA disconnection")
if addr and addr not in ev:
raise Exception("Unexpected STA address in disconnection event: " + ev)
return ev
def wait_4way_hs(self, addr=None, timeout=1):
ev = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=timeout)
if ev is None:
raise Exception("hostapd did not report 4-way handshake completion")
if addr and addr not in ev:
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev)
return ev
def wait_ptkinitdone(self, addr, timeout=2):
while timeout > 0:
sta = self.get_sta(addr)
if 'hostapdWPAPTKState' not in sta:
raise Exception("GET_STA did not return hostapdWPAPTKState")
state = sta['hostapdWPAPTKState']
if state == "11":
return
time.sleep(0.1)
timeout -= 0.1
raise Exception("Timeout while waiting for PTKINITDONE")
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_status_field(self, field):
vals = self.get_status()
if field in vals:
return vals[field]
return None
def get_driver_status(self):
res = self.request("STATUS-DRIVER")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_driver_status_field(self, field):
vals = self.get_driver_status()
if field in vals:
return vals[field]
return None
def get_config(self):
res = self.request("GET_CONFIG")
lines = res.splitlines()
vals = dict()
for l in lines:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def mgmt_rx(self, timeout=5):
ev = self.wait_event(["MGMT-RX"], timeout=timeout)
if ev is None:
return None
msg = {}
frame = binascii.unhexlify(ev.split(' ')[1])
msg['frame'] = frame
hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
msg['fc'] = hdr[0]
msg['subtype'] = (hdr[0] >> 4) & 0xf
hdr = hdr[1:]
msg['duration'] = hdr[0]
hdr = hdr[1:]
msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
hdr = hdr[6:]
msg['seq_ctrl'] = hdr[0]
msg['payload'] = frame[24:]
return msg
def mgmt_tx(self, msg):
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
hdr = struct.pack('<HH6B6B6BH', *t)
res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
if "OK" not in res:
raise Exception("MGMT_TX command to hostapd failed")
def get_sta(self, addr, info=None, next=False):
cmd = "STA-NEXT " if next else "STA "
if addr is None:
res = self.request("STA-FIRST")
elif info:
res = self.request(cmd + addr + " " + info)
else:
res = self.request(cmd + addr)
lines = res.splitlines()
vals = dict()
first = True
for l in lines:
if first and '=' not in l:
vals['addr'] = l
first = False
else:
[name, value] = l.split('=', 1)
vals[name] = value
return vals
def get_mib(self, param=None):
if param:
res = self.request("MIB " + param)
else:
res = self.request("MIB")
lines = res.splitlines()
vals = dict()
for l in lines:
name_val = l.split('=', 1)
if len(name_val) > 1:
vals[name_val[0]] = name_val[1]
return vals
def get_pmksa(self, addr):
res = self.request("PMKSA")
lines = res.splitlines()
for l in lines:
if addr not in l:
continue
vals = dict()
[index, aa, pmkid, expiration, opportunistic] = l.split(' ')
vals['index'] = index
vals['pmkid'] = pmkid
vals['expiration'] = expiration
vals['opportunistic'] = opportunistic
return vals
return None
def dpp_qr_code(self, uri):
res = self.request("DPP_QR_CODE " + uri)
if "FAIL" in res:
raise Exception("Failed to parse QR Code URI")
return int(res)
def dpp_nfc_uri(self, uri):
res = self.request("DPP_NFC_URI " + uri)
if "FAIL" in res:
raise Exception("Failed to parse NFC URI")
return int(res)
def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
curve=None, key=None, supported_curves=None,
host=None):
cmd = "DPP_BOOTSTRAP_GEN type=" + type
if chan:
cmd += " chan=" + chan
if mac:
if mac is True:
mac = self.own_addr()
cmd += " mac=" + mac.replace(':', '')
if info:
cmd += " info=" + info
if curve:
cmd += " curve=" + curve
if key:
cmd += " key=" + key
if supported_curves:
cmd += " supported_curves=" + supported_curves
if host:
cmd += " host=" + host
res = self.request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
return int(res)
def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None,
extra=None):
cmd = "DPP_BOOTSTRAP_SET %d" % id
if ssid:
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
if extra:
cmd += " " + extra
if conf:
cmd += " conf=" + conf
if configurator is not None:
cmd += " configurator=%d" % configurator
if "OK" not in self.request(cmd):
raise Exception("Failed to set bootstrapping parameters")
def dpp_listen(self, freq, netrole=None, qr=None, role=None):
cmd = "DPP_LISTEN " + str(freq)
if netrole:
cmd += " netrole=" + netrole
if qr:
cmd += " qr=" + qr
if role:
cmd += " role=" + role
if "OK" not in self.request(cmd):
raise Exception("Failed to start listen operation")
def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
extra=None, own=None, role=None, neg_freq=None,
ssid=None, passphrase=None, expect_fail=False,
conn_status=False, nfc_uri=None):
cmd = "DPP_AUTH_INIT"
if peer is None:
if nfc_uri:
peer = self.dpp_nfc_uri(nfc_uri)
else:
peer = self.dpp_qr_code(uri)
cmd += " peer=%d" % peer
if own is not None:
cmd += " own=%d" % own
if role:
cmd += " role=" + role
if extra:
cmd += " " + extra
if conf:
cmd += " conf=" + conf
if configurator is not None:
cmd += " configurator=%d" % configurator
if neg_freq:
cmd += " neg_freq=%d" % neg_freq
if ssid:
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
if passphrase:
cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
if conn_status:
cmd += " conn_status=1"
res = self.request(cmd)
if expect_fail:
if "FAIL" not in res:
raise Exception("DPP authentication started unexpectedly")
return
if "OK" not in res:
raise Exception("Failed to initiate DPP Authentication")
def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
extra=None, use_id=None, ver=None):
if use_id is None:
id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
else:
id1 = use_id
cmd = "own=%d " % id1
if identifier:
cmd += "identifier=%s " % identifier
cmd += "init=1 "
if ver is not None:
cmd += "ver=" + str(ver) + " "
if role:
cmd += "role=%s " % role
if extra:
cmd += extra + " "
cmd += "code=%s" % code
res = self.request("DPP_PKEX_ADD " + cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (initiator)")
return id1
def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
listen_role=None):
id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
cmd = "own=%d " % id0
if identifier:
cmd += "identifier=%s " % identifier
cmd += "code=%s" % code
res = self.request("DPP_PKEX_ADD " + cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
self.dpp_listen(freq, role=listen_role)
def dpp_configurator_add(self, curve=None, key=None,
net_access_key_curve=None):
cmd = "DPP_CONFIGURATOR_ADD"
if curve:
cmd += " curve=" + curve
if net_access_key_curve:
cmd += " net_access_key_curve=" + curve
if key:
cmd += " key=" + key
res = self.request(cmd)
if "FAIL" in res:
raise Exception("Failed to add configurator")
return int(res)
def dpp_configurator_remove(self, conf_id):
res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
if "OK" not in res:
raise Exception("DPP_CONFIGURATOR_REMOVE failed")
def note(self, txt):
self.request("NOTE " + txt)
def send_file(self, src, dst):
self.host.send_file(src, dst)
def get_ptksa(self, bssid, cipher):
res = self.request("PTKSA_CACHE_LIST")
lines = res.splitlines()
for l in lines:
if bssid not in l or cipher not in l:
continue
vals = dict()
[index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5)
vals['index'] = index
vals['addr'] = addr
vals['cipher'] = cipher
vals['expiration'] = expiration
vals['tk'] = tk
vals['kdk'] = kdk
return vals
return None
def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
global_ctrl_override=None, driver=False, set_channel=True):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
except:
logger.info("Starting AP " + ifname)
hostname = None
port = 8878
else:
ifname = apdev
logger.info("Starting AP " + ifname + " (old add_ap argument type)")
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev,
global_ctrl_override=global_ctrl_override)
hapd_global.remove(ifname)
hapd_global.add(ifname, driver=driver)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
hapd.set_defaults(set_channel=set_channel)
fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
"wpa", "wpa_deny_ptk0_rekey",
"wpa_pairwise", "rsn_pairwise", "auth_server_addr",
"acct_server_addr", "osu_server_uri"]
for field in fields:
if field in params:
hapd.set(field, params[field])
for f, v in list(params.items()):
if f in fields:
continue
if isinstance(v, list):
for val in v:
hapd.set(f, val)
else:
hapd.set(f, v)
if no_enable:
return hapd
hapd.enable()
if wait_enabled:
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
if ev is None:
raise Exception("AP startup timed out")
if "AP-ENABLED" not in ev:
raise Exception("AP startup failed")
return hapd
def add_bss(apdev, ifname, confname, ignore_error=False):
phy = utils.get_phy(apdev)
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
except:
logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
confname = cfg_file(apdev, confname, ifname)
hapd_global.send_file(confname, confname)
hapd_global.add_bss(phy, confname, ignore_error)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def add_iface(apdev, confname):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
except:
logger.info("Starting interface " + ifname)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
confname = cfg_file(apdev, confname, ifname)
hapd_global.send_file(confname, confname)
hapd_global.add_iface(ifname, confname)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def add_mld_link(apdev, params):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
except:
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
else:
ifname = apdev
logger.info("Adding link on: ifname=" + ifname)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
confname, ctrl_iface = cfg_mld_link_file(ifname, params)
hapd_global.send_file(confname, confname)
try:
hapd_global.add_link(ifname, confname)
except Exception as e:
if str(e) == "Could not add hostapd link":
raise utils.HwsimSkip("No MLO support in hostapd")
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
return hapd
def remove_bss(apdev, ifname=None):
if ifname == None:
ifname = apdev['ifname']
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
except:
logger.info("Removing BSS " + ifname)
hapd_global = HostapdGlobal(apdev)
hapd_global.remove(ifname)
def terminate(apdev):
try:
hostname = apdev['hostname']
port = apdev['port']
logger.info("Terminating hostapd " + hostname + "/" + port)
except:
logger.info("Terminating hostapd")
hapd_global = HostapdGlobal(apdev)
hapd_global.terminate()
def wpa3_params(ssid=None, password=None, wpa_key_mgmt="SAE",
ieee80211w="2"):
params = {"wpa": "2",
"wpa_key_mgmt": wpa_key_mgmt,
"ieee80211w": ieee80211w,
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if password:
params["sae_password"] = password
return params
def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
ieee80211w=None):
params = {"wpa": "2",
"wpa_key_mgmt": wpa_key_mgmt,
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
if ieee80211w is not None:
params["ieee80211w"] = ieee80211w
return params
def wpa_params(ssid=None, passphrase=None):
params = {"wpa": "1",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def wpa_mixed_params(ssid=None, passphrase=None):
params = {"wpa": "3",
"wpa_key_mgmt": "WPA-PSK",
"wpa_pairwise": "TKIP",
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
return params
def radius_params():
params = {"auth_server_addr": "127.0.0.1",
"auth_server_port": "1812",
"auth_server_shared_secret": "radius",
"nas_identifier": "nas.w1.fi"}
return params
def wpa_eap_params(ssid=None):
params = radius_params()
params["wpa"] = "1"
params["wpa_key_mgmt"] = "WPA-EAP"
params["wpa_pairwise"] = "TKIP"
params["ieee8021x"] = "1"
if ssid:
params["ssid"] = ssid
return params
def wpa2_eap_params(ssid=None):
params = radius_params()
params["wpa"] = "2"
params["wpa_key_mgmt"] = "WPA-EAP"
params["rsn_pairwise"] = "CCMP"
params["ieee8021x"] = "1"
if ssid:
params["ssid"] = ssid
return params
def b_only_params(channel="1", ssid=None, country=None):
params = {"hw_mode": "b",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def g_only_params(channel="1", ssid=None, country=None):
params = {"hw_mode": "g",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def a_only_params(channel="36", ssid=None, country=None):
params = {"hw_mode": "a",
"channel": channel}
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def ht20_params(channel="1", ssid=None, country=None):
params = {"ieee80211n": "1",
"channel": channel,
"hw_mode": "g"}
if int(channel) > 14:
params["hw_mode"] = "a"
if ssid:
params["ssid"] = ssid
if country:
params["country_code"] = country
return params
def ht40_plus_params(channel="1", ssid=None, country=None):
params = ht20_params(channel, ssid, country)
params['ht_capab'] = "[HT40+]"
return params
def ht40_minus_params(channel="1", ssid=None, country=None):
params = ht20_params(channel, ssid, country)
params['ht_capab'] = "[HT40-]"
return params
def cmd_execute(apdev, cmd, shell=False):
hapd_global = HostapdGlobal(apdev)
return hapd_global.cmd_execute(cmd, shell=shell)
def send_file(apdev, src, dst):
hapd_global = HostapdGlobal(apdev)
return hapd_global.send_file(src, dst)
def acl_file(dev, apdev, conf):
fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
if conf == 'hostapd.macaddr':
mac0 = dev[0].get_status_field("address")
f.write(mac0 + '\n')
f.write("02:00:00:00:00:12\n")
f.write("02:00:00:00:00:34\n")
f.write("-02:00:00:00:00:12\n")
f.write("-02:00:00:00:00:34\n")
f.write("01:01:01:01:01:01\n")
f.write("03:01:01:01:01:03\n")
elif conf == 'hostapd.accept':
mac0 = dev[0].get_status_field("address")
mac1 = dev[1].get_status_field("address")
f.write(mac0 + " 1\n")
f.write(mac1 + " 2\n")
elif conf == 'hostapd.accept2':
mac0 = dev[0].get_status_field("address")
mac1 = dev[1].get_status_field("address")
mac2 = dev[2].get_status_field("address")
f.write(mac0 + " 1\n")
f.write(mac1 + " 2\n")
f.write(mac2 + " 3\n")
else:
f.close()
os.unlink(filename)
return conf
return filename
def bssid_inc(apdev, inc=1):
parts = apdev['bssid'].split(':')
parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
parts[3], parts[4], parts[5])
return bssid
def cfg_file(apdev, conf, ifname=None):
match = re.search(r'^bss-.+', conf)
if match:
# put cfg file in /tmp directory
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
if ifname is None:
ifname = apdev['ifname']
if idx != '1':
ifname = ifname + '-' + idx
f.write("driver=nl80211\n")
f.write("ctrl_interface=/var/run/hostapd\n")
f.write("hw_mode=g\n")
f.write("channel=1\n")
f.write("ieee80211n=1\n")
if conf.startswith('bss-ht40-'):
f.write("ht_capab=[HT40+]\n")
f.write("interface=%s\n" % ifname)
f.write("ssid=bss-%s\n" % idx)
if conf == 'bss-2-dup.conf':
bssid = apdev['bssid']
else:
bssid = bssid_inc(apdev, int(idx) - 1)
f.write("bssid=%s\n" % bssid)
return fname
return conf
idx = 0
def cfg_mld_link_file(ifname, params):
global idx
ctrl_iface="/var/run/hostapd"
conf = "link-%d.conf" % idx
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
f = os.fdopen(fd, 'w')
if idx != 0:
ctrl_iface="/var/run/hostapd_%d" % idx
f.write("ctrl_interface=%s\n" % ctrl_iface)
f.write("driver=nl80211\n")
f.write("ieee80211n=1\n")
f.write("ieee80211ac=1\n")
f.write("ieee80211ax=1\n")
f.write("ieee80211be=1\n")
f.write("interface=%s\n" % ifname)
f.write("mld_ap=1\n")
f.write("mld_id=0\n")
for k, v in list(params.items()):
f.write("{}={}\n".format(k,v))
idx = idx + 1
return fname, ctrl_iface