# Python class for controlling hostapd # Copyright (c) 2013-2019, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import os import re import time import logging import binascii import struct import tempfile import wpaspy import remotehost import utils import subprocess from remotectrl import RemoteCtrl logger = logging.getLogger() hapd_ctrl = '/var/run/hostapd' hapd_global = '/var/run/hostapd-global' def mac2tuple(mac): return struct.unpack('6B', binascii.unhexlify(mac.replace(':', ''))) class HostapdGlobal: def __init__(self, apdev=None, global_ctrl_override=None): try: hostname = apdev['hostname'] port = apdev['port'] if 'remote_cli' in apdev: remote_cli = apdev['remote_cli'] else: remote_cli = False except: hostname = None port = 8878 remote_cli = False self.host = remotehost.Host(hostname) self.hostname = hostname self.port = port self.remote_cli = remote_cli if hostname is None: global_ctrl = hapd_global if global_ctrl_override: global_ctrl = global_ctrl_override self.ctrl = wpaspy.Ctrl(global_ctrl) self.mon = wpaspy.Ctrl(global_ctrl) self.dbg = "" else: if remote_cli: global_ctrl = hapd_global if global_ctrl_override: global_ctrl = global_ctrl_override self.ctrl = RemoteCtrl(global_ctrl, port, hostname=hostname) self.mon = RemoteCtrl(global_ctrl, port, hostname=hostname) self.dbg = hostname + "/global" else: self.ctrl = wpaspy.Ctrl(hostname, port) self.mon = wpaspy.Ctrl(hostname, port) self.dbg = hostname + "/" + str(port) self.mon.attach() def cmd_execute(self, cmd_array, shell=False): if self.hostname is None: if shell: cmd = ' '.join(cmd_array) else: cmd = cmd_array proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=shell) out = proc.communicate()[0] ret = proc.returncode return ret, out.decode() else: return self.host.execute(cmd_array) def request(self, cmd, timeout=10): logger.debug(self.dbg + ": CTRL(global): " + cmd) return self.ctrl.request(cmd, timeout) def wait_event(self, events, timeout): start = os.times()[4] while True: while self.mon.pending(): ev = self.mon.recv() logger.debug(self.dbg + "(global): " + ev) for event in events: if event in ev: return ev now = os.times()[4] remaining = start + timeout - now if remaining <= 0: break if not self.mon.pending(timeout=remaining): break return None def add(self, ifname, driver=None): cmd = "ADD " + ifname + " " + hapd_ctrl if driver: cmd += " " + driver res = self.request(cmd) if "OK" not in res: raise Exception("Could not add hostapd interface " + ifname) def add_iface(self, ifname, confname): res = self.request("ADD " + ifname + " config=" + confname) if "OK" not in res: raise Exception("Could not add hostapd interface") def add_bss(self, phy, confname, ignore_error=False): res = self.request("ADD bss_config=" + phy + ":" + confname) if "OK" not in res: if not ignore_error: raise Exception("Could not add hostapd BSS") def add_link(self, ifname, confname): res = self.request("ADD " + ifname + " config=" + confname) if "OK" not in res: raise Exception("Could not add hostapd link") def remove(self, ifname): self.request("REMOVE " + ifname, timeout=30) def relog(self): self.request("RELOG") def flush(self): self.request("FLUSH") def get_ctrl_iface_port(self, ifname): if self.hostname is None: return None if self.remote_cli: return None res = self.request("INTERFACES ctrl") lines = res.splitlines() found = False for line in lines: words = line.split() if words[0] == ifname: found = True break if not found: raise Exception("Could not find UDP port for " + ifname) res = line.find("ctrl_iface=udp:") if res == -1: raise Exception("Wrong ctrl_interface format") words = line.split(":") return int(words[1]) def terminate(self): self.mon.detach() self.mon.close() self.mon = None self.ctrl.terminate() self.ctrl = None def send_file(self, src, dst): self.host.send_file(src, dst) class Hostapd: def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl, port=8877, remote_cli=False, link=None): self.hostname = hostname self.host = remotehost.Host(hostname, ifname) self.ifname = ifname self.remote_cli = remote_cli if hostname is None: if link is not None: ifname = ifname + "_link" + str(link) self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname)) self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname)) self.dbg = ifname else: if remote_cli: self.ctrl = RemoteCtrl(ctrl, port, hostname=hostname, ifname=ifname) self.mon = RemoteCtrl(ctrl, port, hostname=hostname, ifname=ifname) else: self.ctrl = wpaspy.Ctrl(hostname, port) self.mon = wpaspy.Ctrl(hostname, port) self.dbg = hostname + "/" + ifname self.mon.attach() self.bssid = None self.bssidx = bssidx self.mld_addr = None def cmd_execute(self, cmd_array, shell=False): if self.hostname is None: if shell: cmd = ' '.join(cmd_array) else: cmd = cmd_array proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=shell) out = proc.communicate()[0] ret = proc.returncode return ret, out.decode() else: return self.host.execute(cmd_array) def close_ctrl(self): if self.mon is not None: self.mon.detach() self.mon.close() self.mon = None self.ctrl.close() self.ctrl = None def own_addr(self): if self.bssid is None: self.bssid = self.get_status_field('bssid[%d]' % self.bssidx) return self.bssid def own_mld_addr(self): if self.mld_addr is None: self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx) return self.mld_addr def get_addr(self, group=False): if self.own_mld_addr() is None: return self.own_addr() return self.own_mld_addr() def request(self, cmd): logger.debug(self.dbg + ": CTRL: " + cmd) return self.ctrl.request(cmd) def ping(self): return "PONG" in self.request("PING") def set(self, field, value): if "OK" not in self.request("SET " + field + " " + value): if "TKIP" in value and (field == "wpa_pairwise" or \ field == "rsn_pairwise"): raise utils.HwsimSkip("Cipher TKIP not supported") raise Exception("Failed to set hostapd parameter " + field) def set_defaults(self, set_channel=True): self.set("driver", "nl80211") if set_channel: self.set("hw_mode", "g") self.set("channel", "1") self.set("ieee80211n", "1") self.set("logger_stdout", "-1") self.set("logger_stdout_level", "0") def set_open(self, ssid): self.set_defaults() self.set("ssid", ssid) def set_wpa2_psk(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "2") self.set("wpa_key_mgmt", "WPA-PSK") self.set("rsn_pairwise", "CCMP") def set_wpa_psk(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "1") self.set("wpa_key_mgmt", "WPA-PSK") self.set("wpa_pairwise", "TKIP") def set_wpa_psk_mixed(self, ssid, passphrase): self.set_defaults() self.set("ssid", ssid) self.set("wpa_passphrase", passphrase) self.set("wpa", "3") self.set("wpa_key_mgmt", "WPA-PSK") self.set("wpa_pairwise", "TKIP") self.set("rsn_pairwise", "CCMP") def set_wep(self, ssid, key): self.set_defaults() self.set("ssid", ssid) self.set("wep_key0", key) def enable(self): if "OK" not in self.request("ENABLE"): raise Exception("Failed to enable hostapd interface " + self.ifname) def disable(self): if "OK" not in self.request("DISABLE"): raise Exception("Failed to disable hostapd interface " + self.ifname) def link_remove(self, count=10): if "OK" not in self.request("LINK_REMOVE %u" % count): raise Exception("Failed to remove hostapd link " + self.ifname) def dump_monitor(self): while self.mon.pending(): ev = self.mon.recv() logger.debug(self.dbg + ": " + ev) def wait_event(self, events, timeout): if not isinstance(events, list): raise Exception("Hostapd.wait_event() called with incorrect events argument type") start = os.times()[4] while True: while self.mon.pending(): ev = self.mon.recv() logger.debug(self.dbg + ": " + ev) for event in events: if event in ev: return ev now = os.times()[4] remaining = start + timeout - now if remaining <= 0: break if not self.mon.pending(timeout=remaining): break return None def wait_sta(self, addr=None, timeout=2, wait_4way_hs=False): ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout) if ev is None: raise Exception("AP did not report STA connection") if addr and addr not in ev: raise Exception("Unexpected STA address in connection event: " + ev) if wait_4way_hs: ev2 = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=timeout) if ev2 is None: raise Exception("AP did not report 4-way handshake completion") if addr and addr not in ev2: raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev2) return ev def wait_sta_disconnect(self, addr=None, timeout=2): ev = self.wait_event(["AP-STA-DISCONNECT"], timeout=timeout) if ev is None: raise Exception("AP did not report STA disconnection") if addr and addr not in ev: raise Exception("Unexpected STA address in disconnection event: " + ev) return ev def wait_4way_hs(self, addr=None, timeout=1): ev = self.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=timeout) if ev is None: raise Exception("hostapd did not report 4-way handshake completion") if addr and addr not in ev: raise Exception("Unexpected STA address in 4-way handshake completion event: " + ev) return ev def wait_ptkinitdone(self, addr, timeout=2): while timeout > 0: sta = self.get_sta(addr) if 'hostapdWPAPTKState' not in sta: raise Exception("GET_STA did not return hostapdWPAPTKState") state = sta['hostapdWPAPTKState'] if state == "11": return time.sleep(0.1) timeout -= 0.1 raise Exception("Timeout while waiting for PTKINITDONE") def get_status(self): res = self.request("STATUS") lines = res.splitlines() vals = dict() for l in lines: [name, value] = l.split('=', 1) vals[name] = value return vals def get_status_field(self, field): vals = self.get_status() if field in vals: return vals[field] return None def get_driver_status(self): res = self.request("STATUS-DRIVER") lines = res.splitlines() vals = dict() for l in lines: [name, value] = l.split('=', 1) vals[name] = value return vals def get_driver_status_field(self, field): vals = self.get_driver_status() if field in vals: return vals[field] return None def get_config(self): res = self.request("GET_CONFIG") lines = res.splitlines() vals = dict() for l in lines: [name, value] = l.split('=', 1) vals[name] = value return vals def mgmt_rx(self, timeout=5): ev = self.wait_event(["MGMT-RX"], timeout=timeout) if ev is None: return None msg = {} frame = binascii.unhexlify(ev.split(' ')[1]) msg['frame'] = frame hdr = struct.unpack('> 4) & 0xf hdr = hdr[1:] msg['duration'] = hdr[0] hdr = hdr[1:] msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] hdr = hdr[6:] msg['seq_ctrl'] = hdr[0] msg['payload'] = frame[24:] return msg def mgmt_tx(self, msg): t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,) hdr = struct.pack(' 1: vals[name_val[0]] = name_val[1] return vals def get_pmksa(self, addr): res = self.request("PMKSA") lines = res.splitlines() for l in lines: if addr not in l: continue vals = dict() [index, aa, pmkid, expiration, opportunistic] = l.split(' ') vals['index'] = index vals['pmkid'] = pmkid vals['expiration'] = expiration vals['opportunistic'] = opportunistic return vals return None def dpp_qr_code(self, uri): res = self.request("DPP_QR_CODE " + uri) if "FAIL" in res: raise Exception("Failed to parse QR Code URI") return int(res) def dpp_nfc_uri(self, uri): res = self.request("DPP_NFC_URI " + uri) if "FAIL" in res: raise Exception("Failed to parse NFC URI") return int(res) def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None, curve=None, key=None, supported_curves=None, host=None): cmd = "DPP_BOOTSTRAP_GEN type=" + type if chan: cmd += " chan=" + chan if mac: if mac is True: mac = self.own_addr() cmd += " mac=" + mac.replace(':', '') if info: cmd += " info=" + info if curve: cmd += " curve=" + curve if key: cmd += " key=" + key if supported_curves: cmd += " supported_curves=" + supported_curves if host: cmd += " host=" + host res = self.request(cmd) if "FAIL" in res: raise Exception("Failed to generate bootstrapping info") return int(res) def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None, extra=None): cmd = "DPP_BOOTSTRAP_SET %d" % id if ssid: cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode() if extra: cmd += " " + extra if conf: cmd += " conf=" + conf if configurator is not None: cmd += " configurator=%d" % configurator if "OK" not in self.request(cmd): raise Exception("Failed to set bootstrapping parameters") def dpp_listen(self, freq, netrole=None, qr=None, role=None): cmd = "DPP_LISTEN " + str(freq) if netrole: cmd += " netrole=" + netrole if qr: cmd += " qr=" + qr if role: cmd += " role=" + role if "OK" not in self.request(cmd): raise Exception("Failed to start listen operation") def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None, extra=None, own=None, role=None, neg_freq=None, ssid=None, passphrase=None, expect_fail=False, conn_status=False, nfc_uri=None): cmd = "DPP_AUTH_INIT" if peer is None: if nfc_uri: peer = self.dpp_nfc_uri(nfc_uri) else: peer = self.dpp_qr_code(uri) cmd += " peer=%d" % peer if own is not None: cmd += " own=%d" % own if role: cmd += " role=" + role if extra: cmd += " " + extra if conf: cmd += " conf=" + conf if configurator is not None: cmd += " configurator=%d" % configurator if neg_freq: cmd += " neg_freq=%d" % neg_freq if ssid: cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode() if passphrase: cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode() if conn_status: cmd += " conn_status=1" res = self.request(cmd) if expect_fail: if "FAIL" not in res: raise Exception("DPP authentication started unexpectedly") return if "OK" not in res: raise Exception("Failed to initiate DPP Authentication") def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None, extra=None, use_id=None, ver=None): if use_id is None: id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve) else: id1 = use_id cmd = "own=%d " % id1 if identifier: cmd += "identifier=%s " % identifier cmd += "init=1 " if ver is not None: cmd += "ver=" + str(ver) + " " if role: cmd += "role=%s " % role if extra: cmd += extra + " " cmd += "code=%s" % code res = self.request("DPP_PKEX_ADD " + cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (initiator)") return id1 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None, listen_role=None): id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve) cmd = "own=%d " % id0 if identifier: cmd += "identifier=%s " % identifier cmd += "code=%s" % code res = self.request("DPP_PKEX_ADD " + cmd) if "FAIL" in res: raise Exception("Failed to set PKEX data (responder)") self.dpp_listen(freq, role=listen_role) def dpp_configurator_add(self, curve=None, key=None, net_access_key_curve=None): cmd = "DPP_CONFIGURATOR_ADD" if curve: cmd += " curve=" + curve if net_access_key_curve: cmd += " net_access_key_curve=" + curve if key: cmd += " key=" + key res = self.request(cmd) if "FAIL" in res: raise Exception("Failed to add configurator") return int(res) def dpp_configurator_remove(self, conf_id): res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id) if "OK" not in res: raise Exception("DPP_CONFIGURATOR_REMOVE failed") def note(self, txt): self.request("NOTE " + txt) def send_file(self, src, dst): self.host.send_file(src, dst) def get_ptksa(self, bssid, cipher): res = self.request("PTKSA_CACHE_LIST") lines = res.splitlines() for l in lines: if bssid not in l or cipher not in l: continue vals = dict() [index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5) vals['index'] = index vals['addr'] = addr vals['cipher'] = cipher vals['expiration'] = expiration vals['tk'] = tk vals['kdk'] = kdk return vals return None def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30, global_ctrl_override=None, driver=False, set_channel=True): if isinstance(apdev, dict): ifname = apdev['ifname'] try: hostname = apdev['hostname'] port = apdev['port'] if 'remote_cli' in apdev: remote_cli = apdev['remote_cli'] else: remote_cli = False if 'global_ctrl_override' in apdev: global_ctrl_override = apdev['global_ctrl_override'] logger.info("Starting AP " + hostname + "/" + port + " " + ifname + " remote_cli " + str(remote_cli)) except: logger.info("Starting AP " + ifname) hostname = None port = 8878 remote_cli = False else: ifname = apdev logger.info("Starting AP " + ifname + " (old add_ap argument type)") hostname = None port = 8878 remote_cli = False hapd_global = HostapdGlobal(apdev, global_ctrl_override=global_ctrl_override) hapd_global.remove(ifname) hapd_global.add(ifname, driver=driver) port = hapd_global.get_ctrl_iface_port(ifname) hapd = Hostapd(ifname, hostname=hostname, port=port, remote_cli=remote_cli) if not hapd.ping(): raise Exception("Could not ping hostapd") hapd.set_defaults(set_channel=set_channel) fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt", "wpa", "wpa_deny_ptk0_rekey", "wpa_pairwise", "rsn_pairwise", "auth_server_addr", "acct_server_addr", "osu_server_uri"] for field in fields: if field in params: hapd.set(field, params[field]) for f, v in list(params.items()): if f in fields: continue if isinstance(v, list): for val in v: hapd.set(f, val) else: hapd.set(f, v) if no_enable: return hapd hapd.enable() if wait_enabled: ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout) if ev is None: raise Exception("AP startup timed out") if "AP-ENABLED" not in ev: raise Exception("AP startup failed") return hapd def add_bss(apdev, ifname, confname, ignore_error=False): phy = utils.get_phy(apdev) try: hostname = apdev['hostname'] port = apdev['port'] if 'remote_cli' in apdev: remote_cli = apdev['remote_cli'] else: remote_cli = False logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname + " remote_cli=" + str(remote_cli)) except: logger.info("Starting BSS phy=" + phy + " ifname=" + ifname) hostname = None port = 8878 remote_cli = False hapd_global = HostapdGlobal(apdev) confname = cfg_file(apdev, confname, ifname) hapd_global.send_file(confname, confname) hapd_global.add_bss(phy, confname, ignore_error) port = hapd_global.get_ctrl_iface_port(ifname) hapd = Hostapd(ifname, hostname=hostname, port=port, remote_cli=remote_cli) if not hapd.ping(): raise Exception("Could not ping hostapd") return hapd def add_iface(apdev, confname): ifname = apdev['ifname'] try: hostname = apdev['hostname'] port = apdev['port'] if 'remote_cli' in apdev: remote_cli = apdev['remote_cli'] else: remote_cli = False logger.info("Starting interface " + hostname + "/" + port + " " + ifname + "remote_cli=" + str(remote_cli)) except: logger.info("Starting interface " + ifname) hostname = None port = 8878 remote_cli = False hapd_global = HostapdGlobal(apdev) confname = cfg_file(apdev, confname, ifname) hapd_global.send_file(confname, confname) hapd_global.add_iface(ifname, confname) port = hapd_global.get_ctrl_iface_port(ifname) hapd = Hostapd(ifname, hostname=hostname, port=port, remote_cli=remote_cli) if not hapd.ping(): raise Exception("Could not ping hostapd") return hapd def add_mld_link(apdev, link_id, params): if isinstance(apdev, dict): ifname = apdev['ifname'] try: hostname = apdev['hostname'] port = apdev['port'] logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname) except: logger.info("Adding link on: ifname=" + ifname) hostname = None port = 8878 else: ifname = apdev logger.info("Adding link on: ifname=" + ifname) hostname = None port = 8878 hapd_global = HostapdGlobal(apdev) confname, ctrl_iface = cfg_mld_link_file(ifname, params) hapd_global.send_file(confname, confname) try: hapd_global.add_link(ifname, confname) except Exception as e: if str(e) == "Could not add hostapd link": raise utils.HwsimSkip("No MLO support in hostapd") port = hapd_global.get_ctrl_iface_port(ifname) hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port, link=link_id) if not hapd.ping(): raise Exception("Could not ping hostapd") return hapd def remove_bss(apdev, ifname=None): if ifname == None: ifname = apdev['ifname'] try: hostname = apdev['hostname'] port = apdev['port'] logger.info("Removing BSS " + hostname + "/" + port + " " + ifname) except: logger.info("Removing BSS " + ifname) hapd_global = HostapdGlobal(apdev) hapd_global.remove(ifname) # wait little to make sure the AP stops beaconing time.sleep(0.1) def terminate(apdev): try: hostname = apdev['hostname'] port = apdev['port'] logger.info("Terminating hostapd " + hostname + "/" + port) except: logger.info("Terminating hostapd") hapd_global = HostapdGlobal(apdev) hapd_global.terminate() def wpa3_params(ssid=None, password=None, wpa_key_mgmt="SAE", ieee80211w="2"): params = {"wpa": "2", "wpa_key_mgmt": wpa_key_mgmt, "ieee80211w": ieee80211w, "rsn_pairwise": "CCMP"} if ssid: params["ssid"] = ssid if password: params["sae_password"] = password return params def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK", ieee80211w=None): params = {"wpa": "2", "wpa_key_mgmt": wpa_key_mgmt, "rsn_pairwise": "CCMP"} if ssid: params["ssid"] = ssid if passphrase: params["wpa_passphrase"] = passphrase if ieee80211w is not None: params["ieee80211w"] = ieee80211w return params def wpa_params(ssid=None, passphrase=None): params = {"wpa": "1", "wpa_key_mgmt": "WPA-PSK", "wpa_pairwise": "TKIP"} if ssid: params["ssid"] = ssid if passphrase: params["wpa_passphrase"] = passphrase return params def wpa_mixed_params(ssid=None, passphrase=None): params = {"wpa": "3", "wpa_key_mgmt": "WPA-PSK", "wpa_pairwise": "TKIP", "rsn_pairwise": "CCMP"} if ssid: params["ssid"] = ssid if passphrase: params["wpa_passphrase"] = passphrase return params def radius_params(): params = {"auth_server_addr": "127.0.0.1", "auth_server_port": "1812", "auth_server_shared_secret": "radius", "nas_identifier": "nas.w1.fi"} return params def wpa_eap_params(ssid=None): params = radius_params() params["wpa"] = "1" params["wpa_key_mgmt"] = "WPA-EAP" params["wpa_pairwise"] = "TKIP" params["ieee8021x"] = "1" if ssid: params["ssid"] = ssid return params def wpa2_eap_params(ssid=None): params = radius_params() params["wpa"] = "2" params["wpa_key_mgmt"] = "WPA-EAP" params["rsn_pairwise"] = "CCMP" params["ieee8021x"] = "1" if ssid: params["ssid"] = ssid return params def b_only_params(channel="1", ssid=None, country=None): params = {"hw_mode": "b", "channel": channel} if ssid: params["ssid"] = ssid if country: params["country_code"] = country return params def g_only_params(channel="1", ssid=None, country=None): params = {"hw_mode": "g", "channel": channel} if ssid: params["ssid"] = ssid if country: params["country_code"] = country return params def a_only_params(channel="36", ssid=None, country=None): params = {"hw_mode": "a", "channel": channel} if ssid: params["ssid"] = ssid if country: params["country_code"] = country return params def ht20_params(channel="1", ssid=None, country=None): params = {"ieee80211n": "1", "channel": channel, "hw_mode": "g"} if int(channel) > 14: params["hw_mode"] = "a" if ssid: params["ssid"] = ssid if country: params["country_code"] = country return params def ht40_plus_params(channel="1", ssid=None, country=None): params = ht20_params(channel, ssid, country) params['ht_capab'] = "[HT40+]" return params def ht40_minus_params(channel="1", ssid=None, country=None): params = ht20_params(channel, ssid, country) params['ht_capab'] = "[HT40-]" return params def he_params(ssid=None): params = {"ssid": "he6ghz", "ieee80211n": "1", "ieee80211ac": "1", "wmm_enabled": "1", "channel": "5", "op_class": "131", "ieee80211ax": "1", "hw_mode": "a", "he_oper_centr_freq_seg0_idx": "15", "he_oper_chwidth": "2", "vht_oper_chwidth": "2"} if ssid: params["ssid"] = ssid return params def he_wpa2_params(ssid=None, wpa_key_mgmt="SAE", rsn_pairwise="CCMP", group_cipher="CCMP", sae_pwe="1", passphrase=None): params = he_params(ssid) params["wpa"] = "2" params["wpa_key_mgmt"] = wpa_key_mgmt params["rsn_pairwise"] = rsn_pairwise params["group_cipher"] = group_cipher params["ieee80211w"] = "2" if "SAE" in wpa_key_mgmt: params["sae_pwe"] = sae_pwe params["sae_groups"] = "19" if passphrase: params["wpa_passphrase"] = passphrase return params def cmd_execute(apdev, cmd, shell=False): hapd_global = HostapdGlobal(apdev) return hapd_global.cmd_execute(cmd, shell=shell) def send_file(apdev, src, dst): hapd_global = HostapdGlobal(apdev) return hapd_global.send_file(src, dst) def acl_file(dev, apdev, conf): fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-') f = os.fdopen(fd, 'w') if conf == 'hostapd.macaddr': mac0 = dev[0].get_status_field("address") f.write(mac0 + '\n') f.write("02:00:00:00:00:12\n") f.write("02:00:00:00:00:34\n") f.write("-02:00:00:00:00:12\n") f.write("-02:00:00:00:00:34\n") f.write("01:01:01:01:01:01\n") f.write("03:01:01:01:01:03\n") elif conf == 'hostapd.accept': mac0 = dev[0].get_status_field("address") mac1 = dev[1].get_status_field("address") f.write(mac0 + " 1\n") f.write(mac1 + " 2\n") elif conf == 'hostapd.accept2': mac0 = dev[0].get_status_field("address") mac1 = dev[1].get_status_field("address") mac2 = dev[2].get_status_field("address") f.write(mac0 + " 1\n") f.write(mac1 + " 2\n") f.write(mac2 + " 3\n") else: f.close() os.unlink(filename) return conf return filename def bssid_inc(apdev, inc=1): parts = apdev['bssid'].split(':') parts[5] = '%02x' % (int(parts[5], 16) + int(inc)) bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2], parts[3], parts[4], parts[5]) return bssid def cfg_file(apdev, conf, ifname=None): match = re.search(r'^bss-.+', conf) if match: # put cfg file in /tmp directory fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-') f = os.fdopen(fd, 'w') idx = ''.join(filter(str.isdigit, conf.split('-')[-1])) if ifname is None: ifname = apdev['ifname'] if idx != '1': ifname = ifname + '-' + idx f.write("driver=nl80211\n") f.write("ctrl_interface=/var/run/hostapd\n") f.write("hw_mode=g\n") f.write("channel=1\n") f.write("ieee80211n=1\n") if conf.startswith('bss-ht40-'): f.write("ht_capab=[HT40+]\n") f.write("interface=%s\n" % ifname) f.write("ssid=bss-%s\n" % idx) if conf == 'bss-2-dup.conf': bssid = apdev['bssid'] else: bssid = bssid_inc(apdev, int(idx) - 1) f.write("bssid=%s\n" % bssid) return fname return conf idx = 0 def cfg_mld_link_file(ifname, params): global idx ctrl_iface="/var/run/hostapd" conf = "link-%d.conf" % idx fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-') f = os.fdopen(fd, 'w') f.write("ctrl_interface=%s\n" % ctrl_iface) f.write("driver=nl80211\n") f.write("ieee80211n=1\n") if 'hw_mode' in params and params['hw_mode'] == 'a' and \ ('op_class' not in params or \ int(params['op_class']) not in [131, 132, 133, 134, 135, 136, 137]): f.write("ieee80211ac=1\n") f.write("ieee80211ax=1\n") f.write("ieee80211be=1\n") f.write("interface=%s\n" % ifname) f.write("mld_ap=1\n") for k, v in list(params.items()): f.write("{}={}\n".format(k,v)) idx = idx + 1 return fname, ctrl_iface