# Python class for controlling wlantest # Copyright (c) 2013-2019, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import re import os import posixpath import time import subprocess import logging import wpaspy logger = logging.getLogger() class Wlantest: remote_host = None setup_params = None exe_thread = None exe_res = [] monitor_mod = None setup_done = False @classmethod def stop_remote_wlantest(cls): if cls.exe_thread is None: # Local flow - no need for remote operations return cls.remote_host.execute(["killall", "-9", "wlantest"]) cls.remote_host.thread_wait(cls.exe_thread, 5) cls.exe_thread = None cls.exe_res = [] @classmethod def reset_remote_wlantest(cls): cls.stop_remote_wlantest() cls.remote_host = None cls.setup_params = None cls.exe_thread = None cls.exe_res = [] cls.monitor_mod = None cls.setup_done = False @classmethod def start_remote_wlantest(cls): if cls.remote_host is None: # Local flow - no need for remote operations return if cls.exe_thread is not None: raise Exception("Cannot start wlantest twice") log_dir = cls.setup_params['log_dir'] ifaces = re.split('; | |, ', cls.remote_host.ifname) ifname = ifaces[0] exe = cls.setup_params["wlantest"] tc_name = cls.setup_params["tc_name"] base_log_name = tc_name + "_wlantest_" + \ cls.remote_host.name + "_" + ifname log_file = posixpath.join(log_dir, base_log_name + ".log") pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng") cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname, pcap_file, log_file) cls.remote_host.add_log(log_file) cls.remote_host.add_log(pcap_file) cls.exe_thread = cls.remote_host.thread_run(cmd.split(), cls.exe_res) # Give wlantest a chance to start working time.sleep(1) @classmethod def register_remote_wlantest(cls, host, setup_params, monitor_mod): if cls.remote_host is not None: raise Exception("Cannot register remote wlantest twice") cls.remote_host = host cls.setup_params = setup_params cls.monitor_mod = monitor_mod status, buf = host.execute(["which", setup_params['wlantest']]) if status != 0: raise Exception(host.name + " - wlantest: " + buf) status, buf = host.execute(["which", setup_params['wlantest_cli']]) if status != 0: raise Exception(host.name + " - wlantest_cli: " + buf) @classmethod def chan_from_wpa(cls, wpa, is_p2p=False): if cls.monitor_mod is None: return m = cls.monitor_mod return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)]) @classmethod def setup(cls, wpa, is_p2p=False): if wpa: cls.chan_from_wpa(wpa, is_p2p) cls.start_remote_wlantest() cls.setup_done = True def __init__(self): if not self.setup_done: raise Exception("Cannot create Wlantest instance before setup()") if os.path.isfile('../../wlantest/wlantest_cli'): self.wlantest_cli = '../../wlantest/wlantest_cli' else: self.wlantest_cli = 'wlantest_cli' def cli_cmd(self, params): if self.remote_host is not None: exe = self.setup_params["wlantest_cli"] ret = self.remote_host.execute([exe] + params) if ret[0] != 0: raise Exception("wlantest_cli failed") return ret[1] else: return subprocess.check_output([self.wlantest_cli] + params).decode() def flush(self): res = self.cli_cmd(["flush"]) if "FAIL" in res: raise Exception("wlantest_cli flush failed") def relog(self): res = self.cli_cmd(["relog"]) if "FAIL" in res: raise Exception("wlantest_cli relog failed") def add_passphrase(self, passphrase): res = self.cli_cmd(["add_passphrase", passphrase]) if "FAIL" in res: raise Exception("wlantest_cli add_passphrase failed") def add_wepkey(self, key): res = self.cli_cmd(["add_wepkey", key]) if "FAIL" in res: raise Exception("wlantest_cli add_key failed") def info_bss(self, field, bssid): res = self.cli_cmd(["info_bss", field, bssid]) if "FAIL" in res: raise Exception("Could not get BSS info from wlantest for " + bssid) return res def get_bss_counter(self, field, bssid): try: res = self.cli_cmd(["get_bss_counter", field, bssid]) except Exception as e: return 0 if "FAIL" in res: return 0 return int(res) def clear_bss_counters(self, bssid): self.cli_cmd(["clear_bss_counters", bssid]) def info_sta(self, field, bssid, addr): res = self.cli_cmd(["info_sta", field, bssid, addr]) if "FAIL" in res: raise Exception("Could not get STA info from wlantest for " + addr) return res def get_sta_counter(self, field, bssid, addr): res = self.cli_cmd(["get_sta_counter", field, bssid, addr]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def clear_sta_counters(self, bssid, addr): res = self.cli_cmd(["clear_sta_counters", bssid, addr]) if "FAIL" in res: raise Exception("wlantest_cli command failed") def tdls_clear(self, bssid, addr1, addr2): self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2]) def get_tdls_counter(self, field, bssid, addr1, addr2): res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def require_ap_pmf_mandatory(self, bssid): res = self.info_bss("rsn_capab", bssid) if "MFPR" not in res: raise Exception("AP did not require PMF") if "MFPC" not in res: raise Exception("AP did not enable PMF") res = self.info_bss("key_mgmt", bssid) if "PSK-SHA256" not in res: raise Exception("AP did not enable SHA256-based AKM for PMF") def require_ap_pmf_optional(self, bssid): res = self.info_bss("rsn_capab", bssid) if "MFPR" in res: raise Exception("AP required PMF") if "MFPC" not in res: raise Exception("AP did not enable PMF") def require_ap_no_pmf(self, bssid): res = self.info_bss("rsn_capab", bssid) if "MFPR" in res: raise Exception("AP required PMF") if "MFPC" in res: raise Exception("AP enabled PMF") def require_sta_pmf_mandatory(self, bssid, addr): res = self.info_sta("rsn_capab", bssid, addr) if "MFPR" not in res: raise Exception("STA did not require PMF") if "MFPC" not in res: raise Exception("STA did not enable PMF") def require_sta_pmf(self, bssid, addr): res = self.info_sta("rsn_capab", bssid, addr) if "MFPC" not in res: raise Exception("STA did not enable PMF") def require_sta_no_pmf(self, bssid, addr): res = self.info_sta("rsn_capab", bssid, addr) if "MFPC" in res: raise Exception("STA enabled PMF") def require_sta_key_mgmt(self, bssid, addr, key_mgmt): res = self.info_sta("key_mgmt", bssid, addr) if key_mgmt not in res: raise Exception("Unexpected STA key_mgmt") def get_tx_tid(self, bssid, addr, tid): res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def get_rx_tid(self, bssid, addr, tid): res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)]) if "FAIL" in res: raise Exception("wlantest_cli command failed") return int(res) def get_tid_counters(self, bssid, addr): tx = {} rx = {} for tid in range(0, 17): tx[tid] = self.get_tx_tid(bssid, addr, tid) rx[tid] = self.get_rx_tid(bssid, addr, tid) return [tx, rx] class WlantestCapture: def __init__(self, ifname, output, netns=None): self.cmd = None self.ifname = ifname if os.path.isfile('../../wlantest/wlantest'): bin = '../../wlantest/wlantest' else: bin = 'wlantest' logger.debug("wlantest[%s] starting" % ifname) args = [bin, '-e', '-i', ifname, '-w', output] if netns: args = ['ip', 'netns', 'exec', netns] + args self.cmd = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(1) def __enter__(self): return self def __exit__(self, type, value, traceback): time.sleep(0.5) self.close() time.sleep(0.5) def __del__(self): if self.cmd: print("WlantestCapture.__del__ needed to run close()") self.close() def close(self): if not self.cmd: return logger.debug("wlantest[%s] stopping" % self.ifname) self.cmd.terminate() res = self.cmd.communicate() if len(res[0]) > 0: logger.debug("wlantest[%s] stdout: %s" % (self.ifname, res[0].decode().strip())) if len(res[1]) > 0: logger.debug("wlantest[%s] stderr: %s" % (self.ifname, res[1].decode().strip())) self.cmd = None