From ab719621b746f44fd9cef72a2a304d17ee694023 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Sun, 17 Mar 2024 16:27:54 +0200 Subject: [PATCH] tests: Allow WlantestCapture to be used with context managers In addition, convert many of the uses to do so. Signed-off-by: Jouni Malinen --- tests/hwsim/test_ap_open.py | 16 ++++----- tests/hwsim/test_ap_psk.py | 68 +++++++++++++++++-------------------- tests/hwsim/test_dpp.py | 56 ++++++++++++------------------ tests/hwsim/test_dpp3.py | 15 ++++---- tests/hwsim/wlantest.py | 12 +++++++ 5 files changed, 78 insertions(+), 89 deletions(-) diff --git a/tests/hwsim/test_ap_open.py b/tests/hwsim/test_ap_open.py index fcec87210..8c8a9c444 100644 --- a/tests/hwsim/test_ap_open.py +++ b/tests/hwsim/test_ap_open.py @@ -996,16 +996,12 @@ def test_ap_open_layer_2_update(dev, apdev, params): cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap") hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) - wt = WlantestCapture(ifname, cap) - time.sleep(1) - - dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") - hapd.wait_sta() - hwsim_utils.test_connectivity(dev[0], hapd) - time.sleep(1) - hwsim_utils.test_connectivity(dev[0], hapd) - time.sleep(0.5) - wt.close() + with WlantestCapture(ifname, cap): + dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") + hapd.wait_sta() + hwsim_utils.test_connectivity(dev[0], hapd) + time.sleep(1) + hwsim_utils.test_connectivity(dev[0], hapd) # Check for Layer 2 Update frame and unexpected frames from the station # that did not fully complete authentication. diff --git a/tests/hwsim/test_ap_psk.py b/tests/hwsim/test_ap_psk.py index 157b863d7..9e34e7b6d 100644 --- a/tests/hwsim/test_ap_psk.py +++ b/tests/hwsim/test_ap_psk.py @@ -3437,45 +3437,41 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params): params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678") params["wpa_key_mgmt"] = "WPA-PSK" hapd = hostapd.add_ap(apdev[0], params) - wt = WlantestCapture(ifname, cap) - time.sleep(1) + with WlantestCapture(ifname, cap): + bssid = hapd.own_addr().replace(':', '') - bssid = hapd.own_addr().replace(':', '') + hapd.request("SET ext_mgmt_frame_handling 1") + addr = "021122334455" + auth = "b0003a01" + bssid + addr + bssid + '1000000001000000' + res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth) + if "OK" not in res: + raise Exception("MGMT_RX_PROCESS failed") + ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) + if ev is None: + raise Exception("No TX status seen") + ev = ev.replace("ok=0", "ok=1") + cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4])) + if "OK" not in hapd.request(cmd): + raise Exception("MGMT_TX_STATUS_PROCESS failed") - hapd.request("SET ext_mgmt_frame_handling 1") - addr = "021122334455" - auth = "b0003a01" + bssid + addr + bssid + '1000000001000000' - res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth) - if "OK" not in res: - raise Exception("MGMT_RX_PROCESS failed") - ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) - if ev is None: - raise Exception("No TX status seen") - ev = ev.replace("ok=0", "ok=1") - cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4])) - if "OK" not in hapd.request(cmd): - raise Exception("MGMT_TX_STATUS_PROCESS failed") + assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000' + res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc) + if "OK" not in res: + raise Exception("MGMT_RX_PROCESS failed") + ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) + if ev is None: + raise Exception("No TX status seen") + ev = ev.replace("ok=0", "ok=1") + cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4])) + if "OK" not in hapd.request(cmd): + raise Exception("MGMT_TX_STATUS_PROCESS failed") + hapd.request("SET ext_mgmt_frame_handling 0") - assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000' - res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc) - if "OK" not in res: - raise Exception("MGMT_RX_PROCESS failed") - ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) - if ev is None: - raise Exception("No TX status seen") - ev = ev.replace("ok=0", "ok=1") - cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4])) - if "OK" not in hapd.request(cmd): - raise Exception("MGMT_TX_STATUS_PROCESS failed") - hapd.request("SET ext_mgmt_frame_handling 0") - - dev[0].connect(ssid, psk="12345678", scan_freq="2412") - hapd.wait_sta() - hwsim_utils.test_connectivity(dev[0], hapd) - time.sleep(1) - hwsim_utils.test_connectivity(dev[0], hapd) - time.sleep(0.5) - wt.close() + dev[0].connect(ssid, psk="12345678", scan_freq="2412") + hapd.wait_sta() + hwsim_utils.test_connectivity(dev[0], hapd) + time.sleep(1) + hwsim_utils.test_connectivity(dev[0], hapd) time.sleep(0.5) # Check for Layer 2 Update frame and unexpected frames from the station diff --git a/tests/hwsim/test_dpp.py b/tests/hwsim/test_dpp.py index af8eb0540..518983bd0 100644 --- a/tests/hwsim/test_dpp.py +++ b/tests/hwsim/test_dpp.py @@ -5781,8 +5781,12 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False, check_dpp_capab(dev[1], min_ver=2) cap_lo = params['prefix'] + ".lo.pcap" - wt = WlantestCapture('lo', cap_lo) + with WlantestCapture('lo', cap_lo): + run_dpp_controller_relay2(dev, apdev, params, chirp, discover, + duplicate) +def run_dpp_controller_relay2(dev, apdev, params, chirp=False, discover=False, + duplicate=False): # Controller conf_id = dev[1].dpp_configurator_add() dev[1].set("dpp_configurator_params", @@ -5892,9 +5896,6 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False, dev[0].wait_connected() relay.wait_sta() - time.sleep(0.5) - wt.close() - def test_dpp_controller_init_through_relay(dev, apdev, params): """DPP Controller initiating through Relay""" try: @@ -5925,8 +5926,12 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False, check_dpp_capab(dev[1], min_ver=2) cap_lo = os.path.join(params['prefix'], ".lo.pcap") - wt = WlantestCapture('lo', cap_lo) + with WlantestCapture('lo', cap_lo): + run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic, + add) +def run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic=False, + add=False): # Controller conf_id = dev[1].dpp_configurator_add() dev[1].set("dpp_configurator_params", @@ -6004,9 +6009,6 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False, if add: relay.request("DPP_RELAY_REMOVE_CONTROLLER 127.0.0.1") - time.sleep(0.5) - wt.close() - class MyTCPServer(TCPServer): def __init__(self, addr, handler): self.allow_reuse_address = True @@ -6098,9 +6100,10 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False): check_dpp_capab(dev0) check_dpp_capab(dev1) - wt = WlantestCapture('lo', cap_lo) - time.sleep(1) + with WlantestCapture('lo', cap_lo): + run_dpp_tcp2(dev0, dev1, cap_lo, port, mutual) +def run_dpp_tcp2(dev0, dev1, cap_lo, port=None, mutual=False): # Controller conf_id = dev1.dpp_configurator_add() dev1.set("dpp_configurator_params", @@ -6154,8 +6157,6 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False): wait_auth_success(dev1, dev0, configurator=dev1, enrollee=dev0, allow_enrollee_failure=True, allow_configurator_failure=True) - time.sleep(0.5) - wt.close() def test_dpp_tcp_conf_init(dev, apdev, params): """DPP over TCP (Configurator initiates)""" @@ -6175,9 +6176,10 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"): check_dpp_capab(dev0, min_ver=2) check_dpp_capab(dev1, min_ver=2) - wt = WlantestCapture('lo', cap_lo) - time.sleep(1) + with WlantestCapture('lo', cap_lo): + run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port, conf) +def run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port=None, conf="sta-dpp"): id_c = dev1.dpp_bootstrap_gen() uri_c = dev1.request("DPP_BOOTSTRAP_GET_URI %d" % id_c) res = dev1.request("DPP_BOOTSTRAP_INFO %d" % id_c) @@ -6194,8 +6196,6 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"): wait_auth_success(dev1, dev0, configurator=dev0, enrollee=dev1, allow_enrollee_failure=True, allow_configurator_failure=True) - time.sleep(0.5) - wt.close() def test_dpp_tcp_controller_management_hostapd(dev, apdev, params): """DPP Controller management in hostapd""" @@ -7570,9 +7570,10 @@ def run_dpp_enterprise_tcp(dev, apdev, params): cap_lo = params['prefix'] + ".lo.pcap" - wt = WlantestCapture('lo', cap_lo) - time.sleep(1) + with WlantestCapture('lo', cap_lo) as wt: + _run_dpp_enterprise_tcp(dev, apdev, params, wt) +def _run_dpp_enterprise_tcp(dev, apdev, params, wt): # Controller conf_id = dev[1].dpp_configurator_add() csrattrs = "MAsGCSqGSIb3DQEJBw==" @@ -7641,9 +7642,6 @@ def run_dpp_enterprise_tcp_end(params, dev, wt): if "DPP-CONF-RECEIVED" not in ev: raise Exception("DPP configuration did not succeed (Enrollee)") - time.sleep(0.5) - wt.close() - def test_dpp_enterprise_tcp2(dev, apdev, params): """DPP over TCP for enterprise provisioning (Controller initiating)""" if not openssl_imported: @@ -7660,21 +7658,11 @@ def run_dpp_enterprise_tcp2(dev, apdev, params): check_dpp_capab(dev[1]) cap_lo = params['prefix'] + ".lo.pcap" - cert_file = params['prefix'] + ".cert.pem" - pkcs7_file = params['prefix'] + ".pkcs7.der" - with open("auth_serv/ec-ca.pem", "rb") as f: - res = f.read() - cacert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, - res) - - with open("auth_serv/ec-ca.key", "rb") as f: - res = f.read() - cakey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, res) - - wt = WlantestCapture('lo', cap_lo) - time.sleep(1) + with WlantestCapture('lo', cap_lo) as wt: + _run_dpp_enterprise_tcp2(dev, apdev, params, wt) +def _run_dpp_enterprise_tcp2(dev, apdev, params, wt): # Client/Enrollee/Responder id_e = dev[0].dpp_bootstrap_gen() uri_e = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id_e) diff --git a/tests/hwsim/test_dpp3.py b/tests/hwsim/test_dpp3.py index 0fde418ba..276ec64da 100644 --- a/tests/hwsim/test_dpp3.py +++ b/tests/hwsim/test_dpp3.py @@ -59,9 +59,10 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False): check_dpp_capab(dev0, min_ver=3) check_dpp_capab(dev1, min_ver=3) - wt = WlantestCapture('lo', cap_lo) - time.sleep(1) + with WlantestCapture('lo', cap_lo): + run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae, status) +def run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae=False, status=False): # Controller if sae: ssid = binascii.hexlify("sae".encode()).decode() @@ -99,9 +100,6 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False): if 'wait_conn_status' not in res or not res['wait_conn_status']: raise Exception("wait_conn_status not reported") - time.sleep(0.5) - wt.close() - def test_dpp_tcp_pkex(dev, apdev, params): """DPP/PKEXv2 over TCP""" prefix = "dpp_tcp_pkex" @@ -221,8 +219,10 @@ def run_dpp_controller_relay_pkex(dev, apdev, params): prefix = "dpp_controller_relay_pkex" cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap") - wt = WlantestCapture('lo', cap_lo) + with WlantestCapture('lo', cap_lo): + run_dpp_controller_relay_pkex2(dev, apdev, params) +def run_dpp_controller_relay_pkex2(dev, apdev, params): # Controller conf_id = dev[1].dpp_configurator_add() dev[1].set("dpp_configurator_params", @@ -277,9 +277,6 @@ def run_dpp_controller_relay_pkex(dev, apdev, params): dev[0].wait_connected() dev[0].dump_monitor() - time.sleep(0.5) - wt.close() - def dpp_pb_ap(apdev): params = {"ssid": "sae", "dpp_configurator_connectivity": "1", diff --git a/tests/hwsim/wlantest.py b/tests/hwsim/wlantest.py index 16765d27a..d15595b2b 100644 --- a/tests/hwsim/wlantest.py +++ b/tests/hwsim/wlantest.py @@ -259,12 +259,24 @@ class WlantestCapture: self.cmd = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + time.sleep(1) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + time.sleep(0.5) + self.close() + time.sleep(0.5) def __del__(self): if self.cmd: + print("WlantestCapture.__del__ needed to run close()") self.close() def close(self): + if not self.cmd: + return logger.debug("wlantest[%s] stopping" % self.ifname) self.cmd.terminate() res = self.cmd.communicate()