8356a38f48
Signed-off-by: Jouni Malinen <quic_jouni@quicinc.com>
1044 lines
34 KiB
Python
1044 lines
34 KiB
Python
# Python class for controlling hostapd
|
|
# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
|
|
#
|
|
# This software may be distributed under the terms of the BSD license.
|
|
# See README for more details.
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import logging
|
|
import binascii
|
|
import struct
|
|
import tempfile
|
|
import wpaspy
|
|
import remotehost
|
|
import utils
|
|
import subprocess
|
|
|
|
logger = logging.getLogger()
|
|
hapd_ctrl = '/var/run/hostapd'
|
|
hapd_global = '/var/run/hostapd-global'
|
|
|
|
def mac2tuple(mac):
|
|
return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
|
|
|
|
class HostapdGlobal:
|
|
def __init__(self, apdev=None, global_ctrl_override=None):
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
except:
|
|
hostname = None
|
|
port = 8878
|
|
self.host = remotehost.Host(hostname)
|
|
self.hostname = hostname
|
|
self.port = port
|
|
if hostname is None:
|
|
global_ctrl = hapd_global
|
|
if global_ctrl_override:
|
|
global_ctrl = global_ctrl_override
|
|
self.ctrl = wpaspy.Ctrl(global_ctrl)
|
|
self.mon = wpaspy.Ctrl(global_ctrl)
|
|
self.dbg = ""
|
|
else:
|
|
self.ctrl = wpaspy.Ctrl(hostname, port)
|
|
self.mon = wpaspy.Ctrl(hostname, port)
|
|
self.dbg = hostname + "/" + str(port)
|
|
self.mon.attach()
|
|
|
|
def cmd_execute(self, cmd_array, shell=False):
|
|
if self.hostname is None:
|
|
if shell:
|
|
cmd = ' '.join(cmd_array)
|
|
else:
|
|
cmd = cmd_array
|
|
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE, shell=shell)
|
|
out = proc.communicate()[0]
|
|
ret = proc.returncode
|
|
return ret, out.decode()
|
|
else:
|
|
return self.host.execute(cmd_array)
|
|
|
|
def request(self, cmd, timeout=10):
|
|
logger.debug(self.dbg + ": CTRL(global): " + cmd)
|
|
return self.ctrl.request(cmd, timeout)
|
|
|
|
def wait_event(self, events, timeout):
|
|
start = os.times()[4]
|
|
while True:
|
|
while self.mon.pending():
|
|
ev = self.mon.recv()
|
|
logger.debug(self.dbg + "(global): " + ev)
|
|
for event in events:
|
|
if event in ev:
|
|
return ev
|
|
now = os.times()[4]
|
|
remaining = start + timeout - now
|
|
if remaining <= 0:
|
|
break
|
|
if not self.mon.pending(timeout=remaining):
|
|
break
|
|
return None
|
|
|
|
def add(self, ifname, driver=None):
|
|
cmd = "ADD " + ifname + " " + hapd_ctrl
|
|
if driver:
|
|
cmd += " " + driver
|
|
res = self.request(cmd)
|
|
if "OK" not in res:
|
|
raise Exception("Could not add hostapd interface " + ifname)
|
|
|
|
def add_iface(self, ifname, confname):
|
|
res = self.request("ADD " + ifname + " config=" + confname)
|
|
if "OK" not in res:
|
|
raise Exception("Could not add hostapd interface")
|
|
|
|
def add_bss(self, phy, confname, ignore_error=False):
|
|
res = self.request("ADD bss_config=" + phy + ":" + confname)
|
|
if "OK" not in res:
|
|
if not ignore_error:
|
|
raise Exception("Could not add hostapd BSS")
|
|
|
|
def add_link(self, ifname, confname):
|
|
res = self.request("ADD " + ifname + " config=" + confname)
|
|
if "OK" not in res:
|
|
raise Exception("Could not add hostapd link")
|
|
|
|
def remove(self, ifname):
|
|
self.request("REMOVE " + ifname, timeout=30)
|
|
|
|
def relog(self):
|
|
self.request("RELOG")
|
|
|
|
def flush(self):
|
|
self.request("FLUSH")
|
|
|
|
def get_ctrl_iface_port(self, ifname):
|
|
if self.hostname is None:
|
|
return None
|
|
|
|
res = self.request("INTERFACES ctrl")
|
|
lines = res.splitlines()
|
|
found = False
|
|
for line in lines:
|
|
words = line.split()
|
|
if words[0] == ifname:
|
|
found = True
|
|
break
|
|
if not found:
|
|
raise Exception("Could not find UDP port for " + ifname)
|
|
res = line.find("ctrl_iface=udp:")
|
|
if res == -1:
|
|
raise Exception("Wrong ctrl_interface format")
|
|
words = line.split(":")
|
|
return int(words[1])
|
|
|
|
def terminate(self):
|
|
self.mon.detach()
|
|
self.mon.close()
|
|
self.mon = None
|
|
self.ctrl.terminate()
|
|
self.ctrl = None
|
|
|
|
def send_file(self, src, dst):
|
|
self.host.send_file(src, dst)
|
|
|
|
class Hostapd:
|
|
def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl,
|
|
port=8877):
|
|
self.hostname = hostname
|
|
self.host = remotehost.Host(hostname, ifname)
|
|
self.ifname = ifname
|
|
if hostname is None:
|
|
self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
|
|
self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
|
|
self.dbg = ifname
|
|
else:
|
|
self.ctrl = wpaspy.Ctrl(hostname, port)
|
|
self.mon = wpaspy.Ctrl(hostname, port)
|
|
self.dbg = hostname + "/" + ifname
|
|
self.mon.attach()
|
|
self.bssid = None
|
|
self.bssidx = bssidx
|
|
self.mld_addr = None
|
|
|
|
def cmd_execute(self, cmd_array, shell=False):
|
|
if self.hostname is None:
|
|
if shell:
|
|
cmd = ' '.join(cmd_array)
|
|
else:
|
|
cmd = cmd_array
|
|
proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE, shell=shell)
|
|
out = proc.communicate()[0]
|
|
ret = proc.returncode
|
|
return ret, out.decode()
|
|
else:
|
|
return self.host.execute(cmd_array)
|
|
|
|
def close_ctrl(self):
|
|
if self.mon is not None:
|
|
self.mon.detach()
|
|
self.mon.close()
|
|
self.mon = None
|
|
self.ctrl.close()
|
|
self.ctrl = None
|
|
|
|
def own_addr(self):
|
|
if self.bssid is None:
|
|
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
|
|
return self.bssid
|
|
|
|
def own_mld_addr(self):
|
|
if self.mld_addr is None:
|
|
self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
|
|
return self.mld_addr
|
|
|
|
def get_addr(self, group=False):
|
|
if self.own_mld_addr() is None:
|
|
return self.own_addr()
|
|
return self.own_mld_addr()
|
|
|
|
def request(self, cmd):
|
|
logger.debug(self.dbg + ": CTRL: " + cmd)
|
|
return self.ctrl.request(cmd)
|
|
|
|
def ping(self):
|
|
return "PONG" in self.request("PING")
|
|
|
|
def set(self, field, value):
|
|
if "OK" not in self.request("SET " + field + " " + value):
|
|
if "TKIP" in value and (field == "wpa_pairwise" or \
|
|
field == "rsn_pairwise"):
|
|
raise utils.HwsimSkip("Cipher TKIP not supported")
|
|
raise Exception("Failed to set hostapd parameter " + field)
|
|
|
|
def set_defaults(self, set_channel=True):
|
|
self.set("driver", "nl80211")
|
|
if set_channel:
|
|
self.set("hw_mode", "g")
|
|
self.set("channel", "1")
|
|
self.set("ieee80211n", "1")
|
|
self.set("logger_stdout", "-1")
|
|
self.set("logger_stdout_level", "0")
|
|
|
|
def set_open(self, ssid):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
|
|
def set_wpa2_psk(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "2")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("rsn_pairwise", "CCMP")
|
|
|
|
def set_wpa_psk(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "1")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("wpa_pairwise", "TKIP")
|
|
|
|
def set_wpa_psk_mixed(self, ssid, passphrase):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wpa_passphrase", passphrase)
|
|
self.set("wpa", "3")
|
|
self.set("wpa_key_mgmt", "WPA-PSK")
|
|
self.set("wpa_pairwise", "TKIP")
|
|
self.set("rsn_pairwise", "CCMP")
|
|
|
|
def set_wep(self, ssid, key):
|
|
self.set_defaults()
|
|
self.set("ssid", ssid)
|
|
self.set("wep_key0", key)
|
|
|
|
def enable(self):
|
|
if "OK" not in self.request("ENABLE"):
|
|
raise Exception("Failed to enable hostapd interface " + self.ifname)
|
|
|
|
def disable(self):
|
|
if "OK" not in self.request("DISABLE"):
|
|
raise Exception("Failed to disable hostapd interface " + self.ifname)
|
|
|
|
def link_remove(self, count=10):
|
|
if "OK" not in self.request("LINK_REMOVE %u" % count):
|
|
raise Exception("Failed to remove hostapd link " + self.ifname)
|
|
|
|
def dump_monitor(self):
|
|
while self.mon.pending():
|
|
ev = self.mon.recv()
|
|
logger.debug(self.dbg + ": " + ev)
|
|
|
|
def wait_event(self, events, timeout):
|
|
if not isinstance(events, list):
|
|
raise Exception("Hostapd.wait_event() called with incorrect events argument type")
|
|
start = os.times()[4]
|
|
while True:
|
|
while self.mon.pending():
|
|
ev = self.mon.recv()
|
|
logger.debug(self.dbg + ": " + ev)
|
|
for event in events:
|
|
if event in ev:
|
|
return ev
|
|
now = os.times()[4]
|
|
remaining = start + timeout - now
|
|
if remaining <= 0:
|
|
break
|
|
if not self.mon.pending(timeout=remaining):
|
|
break
|
|
return None
|
|
|
|
def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False):
|
|
ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
|
|
if ev is None:
|
|
raise Exception("AP did not report STA connection")
|
|
if addr and addr not in ev:
|
|
raise Exception("Unexpected STA address in connection event: " + ev)
|
|
if wait_4way_hs:
|
|
ev2 = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"],
|
|
timeout=timeout)
|
|
if ev2 is None:
|
|
raise Exception("AP did not report 4-way handshake completion")
|
|
if addr and addr not in ev2:
|
|
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2)
|
|
return ev
|
|
|
|
def wait_sta_disconnect(self, addr=None, timeout=2):
|
|
ev = self.wait_event(["AP-STA-DISCONNECT"], timeout=timeout)
|
|
if ev is None:
|
|
raise Exception("AP did not report STA disconnection")
|
|
if addr and addr not in ev:
|
|
raise Exception("Unexpected STA address in disconnection event: " + ev)
|
|
return ev
|
|
|
|
def wait_4way_hs(self, addr=None, timeout=1):
|
|
ev = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=timeout)
|
|
if ev is None:
|
|
raise Exception("hostapd did not report 4-way handshake completion")
|
|
if addr and addr not in ev:
|
|
raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev)
|
|
return ev
|
|
|
|
def wait_ptkinitdone(self, addr, timeout=2):
|
|
while timeout > 0:
|
|
sta = self.get_sta(addr)
|
|
if 'hostapdWPAPTKState' not in sta:
|
|
raise Exception("GET_STA did not return hostapdWPAPTKState")
|
|
state = sta['hostapdWPAPTKState']
|
|
if state == "11":
|
|
return
|
|
time.sleep(0.1)
|
|
timeout -= 0.1
|
|
raise Exception("Timeout while waiting for PTKINITDONE")
|
|
|
|
def get_status(self):
|
|
res = self.request("STATUS")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name, value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_status_field(self, field):
|
|
vals = self.get_status()
|
|
if field in vals:
|
|
return vals[field]
|
|
return None
|
|
|
|
def get_driver_status(self):
|
|
res = self.request("STATUS-DRIVER")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name, value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_driver_status_field(self, field):
|
|
vals = self.get_driver_status()
|
|
if field in vals:
|
|
return vals[field]
|
|
return None
|
|
|
|
def get_config(self):
|
|
res = self.request("GET_CONFIG")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
[name, value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def mgmt_rx(self, timeout=5):
|
|
ev = self.wait_event(["MGMT-RX"], timeout=timeout)
|
|
if ev is None:
|
|
return None
|
|
msg = {}
|
|
frame = binascii.unhexlify(ev.split(' ')[1])
|
|
msg['frame'] = frame
|
|
|
|
hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
|
|
msg['fc'] = hdr[0]
|
|
msg['subtype'] = (hdr[0] >> 4) & 0xf
|
|
hdr = hdr[1:]
|
|
msg['duration'] = hdr[0]
|
|
hdr = hdr[1:]
|
|
msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
|
|
hdr = hdr[6:]
|
|
msg['seq_ctrl'] = hdr[0]
|
|
msg['payload'] = frame[24:]
|
|
|
|
return msg
|
|
|
|
def mgmt_tx(self, msg):
|
|
t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
|
|
hdr = struct.pack('<HH6B6B6BH', *t)
|
|
res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
|
|
if "OK" not in res:
|
|
raise Exception("MGMT_TX command to hostapd failed")
|
|
|
|
def get_sta(self, addr, info=None, next=False):
|
|
cmd = "STA-NEXT " if next else "STA "
|
|
if addr is None:
|
|
res = self.request("STA-FIRST")
|
|
elif info:
|
|
res = self.request(cmd + addr + " " + info)
|
|
else:
|
|
res = self.request(cmd + addr)
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
first = True
|
|
for l in lines:
|
|
if first and '=' not in l:
|
|
vals['addr'] = l
|
|
first = False
|
|
else:
|
|
[name, value] = l.split('=', 1)
|
|
vals[name] = value
|
|
return vals
|
|
|
|
def get_mib(self, param=None):
|
|
if param:
|
|
res = self.request("MIB " + param)
|
|
else:
|
|
res = self.request("MIB")
|
|
lines = res.splitlines()
|
|
vals = dict()
|
|
for l in lines:
|
|
name_val = l.split('=', 1)
|
|
if len(name_val) > 1:
|
|
vals[name_val[0]] = name_val[1]
|
|
return vals
|
|
|
|
def get_pmksa(self, addr):
|
|
res = self.request("PMKSA")
|
|
lines = res.splitlines()
|
|
for l in lines:
|
|
if addr not in l:
|
|
continue
|
|
vals = dict()
|
|
[index, aa, pmkid, expiration, opportunistic] = l.split(' ')
|
|
vals['index'] = index
|
|
vals['pmkid'] = pmkid
|
|
vals['expiration'] = expiration
|
|
vals['opportunistic'] = opportunistic
|
|
return vals
|
|
return None
|
|
|
|
def dpp_qr_code(self, uri):
|
|
res = self.request("DPP_QR_CODE " + uri)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to parse QR Code URI")
|
|
return int(res)
|
|
|
|
def dpp_nfc_uri(self, uri):
|
|
res = self.request("DPP_NFC_URI " + uri)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to parse NFC URI")
|
|
return int(res)
|
|
|
|
def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
|
|
curve=None, key=None, supported_curves=None,
|
|
host=None):
|
|
cmd = "DPP_BOOTSTRAP_GEN type=" + type
|
|
if chan:
|
|
cmd += " chan=" + chan
|
|
if mac:
|
|
if mac is True:
|
|
mac = self.own_addr()
|
|
cmd += " mac=" + mac.replace(':', '')
|
|
if info:
|
|
cmd += " info=" + info
|
|
if curve:
|
|
cmd += " curve=" + curve
|
|
if key:
|
|
cmd += " key=" + key
|
|
if supported_curves:
|
|
cmd += " supported_curves=" + supported_curves
|
|
if host:
|
|
cmd += " host=" + host
|
|
res = self.request(cmd)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to generate bootstrapping info")
|
|
return int(res)
|
|
|
|
def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None,
|
|
extra=None):
|
|
cmd = "DPP_BOOTSTRAP_SET %d" % id
|
|
if ssid:
|
|
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
|
|
if extra:
|
|
cmd += " " + extra
|
|
if conf:
|
|
cmd += " conf=" + conf
|
|
if configurator is not None:
|
|
cmd += " configurator=%d" % configurator
|
|
if "OK" not in self.request(cmd):
|
|
raise Exception("Failed to set bootstrapping parameters")
|
|
|
|
def dpp_listen(self, freq, netrole=None, qr=None, role=None):
|
|
cmd = "DPP_LISTEN " + str(freq)
|
|
if netrole:
|
|
cmd += " netrole=" + netrole
|
|
if qr:
|
|
cmd += " qr=" + qr
|
|
if role:
|
|
cmd += " role=" + role
|
|
if "OK" not in self.request(cmd):
|
|
raise Exception("Failed to start listen operation")
|
|
|
|
def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
|
|
extra=None, own=None, role=None, neg_freq=None,
|
|
ssid=None, passphrase=None, expect_fail=False,
|
|
conn_status=False, nfc_uri=None):
|
|
cmd = "DPP_AUTH_INIT"
|
|
if peer is None:
|
|
if nfc_uri:
|
|
peer = self.dpp_nfc_uri(nfc_uri)
|
|
else:
|
|
peer = self.dpp_qr_code(uri)
|
|
cmd += " peer=%d" % peer
|
|
if own is not None:
|
|
cmd += " own=%d" % own
|
|
if role:
|
|
cmd += " role=" + role
|
|
if extra:
|
|
cmd += " " + extra
|
|
if conf:
|
|
cmd += " conf=" + conf
|
|
if configurator is not None:
|
|
cmd += " configurator=%d" % configurator
|
|
if neg_freq:
|
|
cmd += " neg_freq=%d" % neg_freq
|
|
if ssid:
|
|
cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
|
|
if passphrase:
|
|
cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
|
|
if conn_status:
|
|
cmd += " conn_status=1"
|
|
res = self.request(cmd)
|
|
if expect_fail:
|
|
if "FAIL" not in res:
|
|
raise Exception("DPP authentication started unexpectedly")
|
|
return
|
|
if "OK" not in res:
|
|
raise Exception("Failed to initiate DPP Authentication")
|
|
|
|
def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
|
|
extra=None, use_id=None, ver=None):
|
|
if use_id is None:
|
|
id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
|
|
else:
|
|
id1 = use_id
|
|
cmd = "own=%d " % id1
|
|
if identifier:
|
|
cmd += "identifier=%s " % identifier
|
|
cmd += "init=1 "
|
|
if ver is not None:
|
|
cmd += "ver=" + str(ver) + " "
|
|
if role:
|
|
cmd += "role=%s " % role
|
|
if extra:
|
|
cmd += extra + " "
|
|
cmd += "code=%s" % code
|
|
res = self.request("DPP_PKEX_ADD " + cmd)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to set PKEX data (initiator)")
|
|
return id1
|
|
|
|
def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
|
|
listen_role=None):
|
|
id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
|
|
cmd = "own=%d " % id0
|
|
if identifier:
|
|
cmd += "identifier=%s " % identifier
|
|
cmd += "code=%s" % code
|
|
res = self.request("DPP_PKEX_ADD " + cmd)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to set PKEX data (responder)")
|
|
self.dpp_listen(freq, role=listen_role)
|
|
|
|
def dpp_configurator_add(self, curve=None, key=None,
|
|
net_access_key_curve=None):
|
|
cmd = "DPP_CONFIGURATOR_ADD"
|
|
if curve:
|
|
cmd += " curve=" + curve
|
|
if net_access_key_curve:
|
|
cmd += " net_access_key_curve=" + curve
|
|
if key:
|
|
cmd += " key=" + key
|
|
res = self.request(cmd)
|
|
if "FAIL" in res:
|
|
raise Exception("Failed to add configurator")
|
|
return int(res)
|
|
|
|
def dpp_configurator_remove(self, conf_id):
|
|
res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
|
|
if "OK" not in res:
|
|
raise Exception("DPP_CONFIGURATOR_REMOVE failed")
|
|
|
|
def note(self, txt):
|
|
self.request("NOTE " + txt)
|
|
|
|
def send_file(self, src, dst):
|
|
self.host.send_file(src, dst)
|
|
|
|
def get_ptksa(self, bssid, cipher):
|
|
res = self.request("PTKSA_CACHE_LIST")
|
|
lines = res.splitlines()
|
|
for l in lines:
|
|
if bssid not in l or cipher not in l:
|
|
continue
|
|
vals = dict()
|
|
[index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5)
|
|
vals['index'] = index
|
|
vals['addr'] = addr
|
|
vals['cipher'] = cipher
|
|
vals['expiration'] = expiration
|
|
vals['tk'] = tk
|
|
vals['kdk'] = kdk
|
|
return vals
|
|
return None
|
|
|
|
def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
|
|
global_ctrl_override=None, driver=False, set_channel=True):
|
|
if isinstance(apdev, dict):
|
|
ifname = apdev['ifname']
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
|
|
except:
|
|
logger.info("Starting AP " + ifname)
|
|
hostname = None
|
|
port = 8878
|
|
else:
|
|
ifname = apdev
|
|
logger.info("Starting AP " + ifname + " (old add_ap argument type)")
|
|
hostname = None
|
|
port = 8878
|
|
hapd_global = HostapdGlobal(apdev,
|
|
global_ctrl_override=global_ctrl_override)
|
|
hapd_global.remove(ifname)
|
|
hapd_global.add(ifname, driver=driver)
|
|
port = hapd_global.get_ctrl_iface_port(ifname)
|
|
hapd = Hostapd(ifname, hostname=hostname, port=port)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
hapd.set_defaults(set_channel=set_channel)
|
|
fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
|
|
"wpa", "wpa_deny_ptk0_rekey",
|
|
"wpa_pairwise", "rsn_pairwise", "auth_server_addr",
|
|
"acct_server_addr", "osu_server_uri"]
|
|
for field in fields:
|
|
if field in params:
|
|
hapd.set(field, params[field])
|
|
for f, v in list(params.items()):
|
|
if f in fields:
|
|
continue
|
|
if isinstance(v, list):
|
|
for val in v:
|
|
hapd.set(f, val)
|
|
else:
|
|
hapd.set(f, v)
|
|
if no_enable:
|
|
return hapd
|
|
hapd.enable()
|
|
if wait_enabled:
|
|
ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
|
|
if ev is None:
|
|
raise Exception("AP startup timed out")
|
|
if "AP-ENABLED" not in ev:
|
|
raise Exception("AP startup failed")
|
|
return hapd
|
|
|
|
def add_bss(apdev, ifname, confname, ignore_error=False):
|
|
phy = utils.get_phy(apdev)
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
|
|
except:
|
|
logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
|
|
hostname = None
|
|
port = 8878
|
|
hapd_global = HostapdGlobal(apdev)
|
|
confname = cfg_file(apdev, confname, ifname)
|
|
hapd_global.send_file(confname, confname)
|
|
hapd_global.add_bss(phy, confname, ignore_error)
|
|
port = hapd_global.get_ctrl_iface_port(ifname)
|
|
hapd = Hostapd(ifname, hostname=hostname, port=port)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
return hapd
|
|
|
|
def add_iface(apdev, confname):
|
|
ifname = apdev['ifname']
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
|
|
except:
|
|
logger.info("Starting interface " + ifname)
|
|
hostname = None
|
|
port = 8878
|
|
hapd_global = HostapdGlobal(apdev)
|
|
confname = cfg_file(apdev, confname, ifname)
|
|
hapd_global.send_file(confname, confname)
|
|
hapd_global.add_iface(ifname, confname)
|
|
port = hapd_global.get_ctrl_iface_port(ifname)
|
|
hapd = Hostapd(ifname, hostname=hostname, port=port)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
return hapd
|
|
|
|
def add_mld_link(apdev, params):
|
|
if isinstance(apdev, dict):
|
|
ifname = apdev['ifname']
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
|
|
except:
|
|
logger.info("Adding link on: ifname=" + ifname)
|
|
hostname = None
|
|
port = 8878
|
|
else:
|
|
ifname = apdev
|
|
logger.info("Adding link on: ifname=" + ifname)
|
|
hostname = None
|
|
port = 8878
|
|
|
|
hapd_global = HostapdGlobal(apdev)
|
|
confname, ctrl_iface = cfg_mld_link_file(ifname, params)
|
|
hapd_global.send_file(confname, confname)
|
|
try:
|
|
hapd_global.add_link(ifname, confname)
|
|
except Exception as e:
|
|
if str(e) == "Could not add hostapd link":
|
|
raise utils.HwsimSkip("No MLO support in hostapd")
|
|
port = hapd_global.get_ctrl_iface_port(ifname)
|
|
hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port)
|
|
if not hapd.ping():
|
|
raise Exception("Could not ping hostapd")
|
|
return hapd
|
|
|
|
def remove_bss(apdev, ifname=None):
|
|
if ifname == None:
|
|
ifname = apdev['ifname']
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
|
|
except:
|
|
logger.info("Removing BSS " + ifname)
|
|
hapd_global = HostapdGlobal(apdev)
|
|
hapd_global.remove(ifname)
|
|
|
|
def terminate(apdev):
|
|
try:
|
|
hostname = apdev['hostname']
|
|
port = apdev['port']
|
|
logger.info("Terminating hostapd " + hostname + "/" + port)
|
|
except:
|
|
logger.info("Terminating hostapd")
|
|
hapd_global = HostapdGlobal(apdev)
|
|
hapd_global.terminate()
|
|
|
|
def wpa3_params(ssid=None, password=None, wpa_key_mgmt="SAE",
|
|
ieee80211w="2"):
|
|
params = {"wpa": "2",
|
|
"wpa_key_mgmt": wpa_key_mgmt,
|
|
"ieee80211w": ieee80211w,
|
|
"rsn_pairwise": "CCMP"}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if password:
|
|
params["sae_password"] = password
|
|
return params
|
|
|
|
def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
|
|
ieee80211w=None):
|
|
params = {"wpa": "2",
|
|
"wpa_key_mgmt": wpa_key_mgmt,
|
|
"rsn_pairwise": "CCMP"}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
if ieee80211w is not None:
|
|
params["ieee80211w"] = ieee80211w
|
|
return params
|
|
|
|
def wpa_params(ssid=None, passphrase=None):
|
|
params = {"wpa": "1",
|
|
"wpa_key_mgmt": "WPA-PSK",
|
|
"wpa_pairwise": "TKIP"}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
return params
|
|
|
|
def wpa_mixed_params(ssid=None, passphrase=None):
|
|
params = {"wpa": "3",
|
|
"wpa_key_mgmt": "WPA-PSK",
|
|
"wpa_pairwise": "TKIP",
|
|
"rsn_pairwise": "CCMP"}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
return params
|
|
|
|
def radius_params():
|
|
params = {"auth_server_addr": "127.0.0.1",
|
|
"auth_server_port": "1812",
|
|
"auth_server_shared_secret": "radius",
|
|
"nas_identifier": "nas.w1.fi"}
|
|
return params
|
|
|
|
def wpa_eap_params(ssid=None):
|
|
params = radius_params()
|
|
params["wpa"] = "1"
|
|
params["wpa_key_mgmt"] = "WPA-EAP"
|
|
params["wpa_pairwise"] = "TKIP"
|
|
params["ieee8021x"] = "1"
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
return params
|
|
|
|
def wpa2_eap_params(ssid=None):
|
|
params = radius_params()
|
|
params["wpa"] = "2"
|
|
params["wpa_key_mgmt"] = "WPA-EAP"
|
|
params["rsn_pairwise"] = "CCMP"
|
|
params["ieee8021x"] = "1"
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
return params
|
|
|
|
def b_only_params(channel="1", ssid=None, country=None):
|
|
params = {"hw_mode": "b",
|
|
"channel": channel}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if country:
|
|
params["country_code"] = country
|
|
return params
|
|
|
|
def g_only_params(channel="1", ssid=None, country=None):
|
|
params = {"hw_mode": "g",
|
|
"channel": channel}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if country:
|
|
params["country_code"] = country
|
|
return params
|
|
|
|
def a_only_params(channel="36", ssid=None, country=None):
|
|
params = {"hw_mode": "a",
|
|
"channel": channel}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if country:
|
|
params["country_code"] = country
|
|
return params
|
|
|
|
def ht20_params(channel="1", ssid=None, country=None):
|
|
params = {"ieee80211n": "1",
|
|
"channel": channel,
|
|
"hw_mode": "g"}
|
|
if int(channel) > 14:
|
|
params["hw_mode"] = "a"
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
if country:
|
|
params["country_code"] = country
|
|
return params
|
|
|
|
def ht40_plus_params(channel="1", ssid=None, country=None):
|
|
params = ht20_params(channel, ssid, country)
|
|
params['ht_capab'] = "[HT40+]"
|
|
return params
|
|
|
|
def ht40_minus_params(channel="1", ssid=None, country=None):
|
|
params = ht20_params(channel, ssid, country)
|
|
params['ht_capab'] = "[HT40-]"
|
|
return params
|
|
|
|
def he_params(ssid=None):
|
|
params = {"ssid": "he6ghz",
|
|
"ieee80211n": "1",
|
|
"ieee80211ac": "1",
|
|
"wmm_enabled": "1",
|
|
"channel": "5",
|
|
"op_class": "131",
|
|
"ieee80211ax": "1",
|
|
"hw_mode": "a",
|
|
"he_oper_centr_freq_seg0_idx": "15",
|
|
"he_oper_chwidth": "2",
|
|
"vht_oper_chwidth": "2"}
|
|
if ssid:
|
|
params["ssid"] = ssid
|
|
|
|
return params
|
|
|
|
def he_wpa2_params(ssid=None, wpa_key_mgmt="SAE", rsn_pairwise="CCMP",
|
|
group_cipher="CCMP", sae_pwe="1", passphrase=None):
|
|
params = he_params(ssid)
|
|
params["wpa"] = "2"
|
|
params["wpa_key_mgmt"] = wpa_key_mgmt
|
|
params["rsn_pairwise"] = rsn_pairwise
|
|
params["group_cipher"] = group_cipher
|
|
params["ieee80211w"] = "2"
|
|
if "SAE" in wpa_key_mgmt:
|
|
params["sae_pwe"] = sae_pwe
|
|
params["sae_groups"] = "19"
|
|
|
|
if passphrase:
|
|
params["wpa_passphrase"] = passphrase
|
|
|
|
return params
|
|
|
|
def cmd_execute(apdev, cmd, shell=False):
|
|
hapd_global = HostapdGlobal(apdev)
|
|
return hapd_global.cmd_execute(cmd, shell=shell)
|
|
|
|
def send_file(apdev, src, dst):
|
|
hapd_global = HostapdGlobal(apdev)
|
|
return hapd_global.send_file(src, dst)
|
|
|
|
def acl_file(dev, apdev, conf):
|
|
fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
|
|
f = os.fdopen(fd, 'w')
|
|
|
|
if conf == 'hostapd.macaddr':
|
|
mac0 = dev[0].get_status_field("address")
|
|
f.write(mac0 + '\n')
|
|
f.write("02:00:00:00:00:12\n")
|
|
f.write("02:00:00:00:00:34\n")
|
|
f.write("-02:00:00:00:00:12\n")
|
|
f.write("-02:00:00:00:00:34\n")
|
|
f.write("01:01:01:01:01:01\n")
|
|
f.write("03:01:01:01:01:03\n")
|
|
elif conf == 'hostapd.accept':
|
|
mac0 = dev[0].get_status_field("address")
|
|
mac1 = dev[1].get_status_field("address")
|
|
f.write(mac0 + " 1\n")
|
|
f.write(mac1 + " 2\n")
|
|
elif conf == 'hostapd.accept2':
|
|
mac0 = dev[0].get_status_field("address")
|
|
mac1 = dev[1].get_status_field("address")
|
|
mac2 = dev[2].get_status_field("address")
|
|
f.write(mac0 + " 1\n")
|
|
f.write(mac1 + " 2\n")
|
|
f.write(mac2 + " 3\n")
|
|
else:
|
|
f.close()
|
|
os.unlink(filename)
|
|
return conf
|
|
|
|
return filename
|
|
|
|
def bssid_inc(apdev, inc=1):
|
|
parts = apdev['bssid'].split(':')
|
|
parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
|
|
bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
|
|
parts[3], parts[4], parts[5])
|
|
return bssid
|
|
|
|
def cfg_file(apdev, conf, ifname=None):
|
|
match = re.search(r'^bss-.+', conf)
|
|
if match:
|
|
# put cfg file in /tmp directory
|
|
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
|
|
f = os.fdopen(fd, 'w')
|
|
idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
|
|
if ifname is None:
|
|
ifname = apdev['ifname']
|
|
if idx != '1':
|
|
ifname = ifname + '-' + idx
|
|
|
|
f.write("driver=nl80211\n")
|
|
f.write("ctrl_interface=/var/run/hostapd\n")
|
|
f.write("hw_mode=g\n")
|
|
f.write("channel=1\n")
|
|
f.write("ieee80211n=1\n")
|
|
if conf.startswith('bss-ht40-'):
|
|
f.write("ht_capab=[HT40+]\n")
|
|
f.write("interface=%s\n" % ifname)
|
|
|
|
f.write("ssid=bss-%s\n" % idx)
|
|
if conf == 'bss-2-dup.conf':
|
|
bssid = apdev['bssid']
|
|
else:
|
|
bssid = bssid_inc(apdev, int(idx) - 1)
|
|
f.write("bssid=%s\n" % bssid)
|
|
|
|
return fname
|
|
|
|
return conf
|
|
|
|
idx = 0
|
|
def cfg_mld_link_file(ifname, params):
|
|
global idx
|
|
ctrl_iface="/var/run/hostapd"
|
|
conf = "link-%d.conf" % idx
|
|
|
|
fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
|
|
f = os.fdopen(fd, 'w')
|
|
|
|
if idx != 0:
|
|
ctrl_iface="/var/run/hostapd_%d" % idx
|
|
|
|
f.write("ctrl_interface=%s\n" % ctrl_iface)
|
|
f.write("driver=nl80211\n")
|
|
f.write("ieee80211n=1\n")
|
|
if 'hw_mode' in params and params['hw_mode'] == 'a' and \
|
|
('op_class' not in params or \
|
|
int(params['op_class']) not in [131, 132, 133, 134, 135, 136, 137]):
|
|
f.write("ieee80211ac=1\n")
|
|
f.write("ieee80211ax=1\n")
|
|
f.write("ieee80211be=1\n")
|
|
f.write("interface=%s\n" % ifname)
|
|
f.write("mld_ap=1\n")
|
|
f.write("mld_id=0\n")
|
|
|
|
for k, v in list(params.items()):
|
|
f.write("{}={}\n".format(k,v))
|
|
|
|
idx = idx + 1
|
|
|
|
return fname, ctrl_iface
|