tests: Allow WlantestCapture to be used with context managers

In addition, convert many of the uses to do so.

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2024-03-17 16:27:54 +02:00
parent 91282e29ff
commit ab719621b7
5 changed files with 78 additions and 89 deletions

View file

@ -996,16 +996,12 @@ def test_ap_open_layer_2_update(dev, apdev, params):
cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
wt = WlantestCapture(ifname, cap)
time.sleep(1)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
hapd.wait_sta()
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
wt.close()
with WlantestCapture(ifname, cap):
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
hapd.wait_sta()
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
# Check for Layer 2 Update frame and unexpected frames from the station
# that did not fully complete authentication.

View file

@ -3437,45 +3437,41 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK"
hapd = hostapd.add_ap(apdev[0], params)
wt = WlantestCapture(ifname, cap)
time.sleep(1)
with WlantestCapture(ifname, cap):
bssid = hapd.own_addr().replace(':', '')
bssid = hapd.own_addr().replace(':', '')
hapd.request("SET ext_mgmt_frame_handling 1")
addr = "021122334455"
auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
if "OK" not in res:
raise Exception("MGMT_RX_PROCESS failed")
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
ev = ev.replace("ok=0", "ok=1")
cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
if "OK" not in hapd.request(cmd):
raise Exception("MGMT_TX_STATUS_PROCESS failed")
hapd.request("SET ext_mgmt_frame_handling 1")
addr = "021122334455"
auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
if "OK" not in res:
raise Exception("MGMT_RX_PROCESS failed")
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
ev = ev.replace("ok=0", "ok=1")
cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
if "OK" not in hapd.request(cmd):
raise Exception("MGMT_TX_STATUS_PROCESS failed")
assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
if "OK" not in res:
raise Exception("MGMT_RX_PROCESS failed")
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
ev = ev.replace("ok=0", "ok=1")
cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
if "OK" not in hapd.request(cmd):
raise Exception("MGMT_TX_STATUS_PROCESS failed")
hapd.request("SET ext_mgmt_frame_handling 0")
assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
if "OK" not in res:
raise Exception("MGMT_RX_PROCESS failed")
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No TX status seen")
ev = ev.replace("ok=0", "ok=1")
cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
if "OK" not in hapd.request(cmd):
raise Exception("MGMT_TX_STATUS_PROCESS failed")
hapd.request("SET ext_mgmt_frame_handling 0")
dev[0].connect(ssid, psk="12345678", scan_freq="2412")
hapd.wait_sta()
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
wt.close()
dev[0].connect(ssid, psk="12345678", scan_freq="2412")
hapd.wait_sta()
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
# Check for Layer 2 Update frame and unexpected frames from the station

View file

@ -5781,8 +5781,12 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False,
check_dpp_capab(dev[1], min_ver=2)
cap_lo = params['prefix'] + ".lo.pcap"
wt = WlantestCapture('lo', cap_lo)
with WlantestCapture('lo', cap_lo):
run_dpp_controller_relay2(dev, apdev, params, chirp, discover,
duplicate)
def run_dpp_controller_relay2(dev, apdev, params, chirp=False, discover=False,
duplicate=False):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
@ -5892,9 +5896,6 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False,
dev[0].wait_connected()
relay.wait_sta()
time.sleep(0.5)
wt.close()
def test_dpp_controller_init_through_relay(dev, apdev, params):
"""DPP Controller initiating through Relay"""
try:
@ -5925,8 +5926,12 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False,
check_dpp_capab(dev[1], min_ver=2)
cap_lo = os.path.join(params['prefix'], ".lo.pcap")
wt = WlantestCapture('lo', cap_lo)
with WlantestCapture('lo', cap_lo):
run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic,
add)
def run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic=False,
add=False):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
@ -6004,9 +6009,6 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False,
if add:
relay.request("DPP_RELAY_REMOVE_CONTROLLER 127.0.0.1")
time.sleep(0.5)
wt.close()
class MyTCPServer(TCPServer):
def __init__(self, addr, handler):
self.allow_reuse_address = True
@ -6098,9 +6100,10 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False):
check_dpp_capab(dev0)
check_dpp_capab(dev1)
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
with WlantestCapture('lo', cap_lo):
run_dpp_tcp2(dev0, dev1, cap_lo, port, mutual)
def run_dpp_tcp2(dev0, dev1, cap_lo, port=None, mutual=False):
# Controller
conf_id = dev1.dpp_configurator_add()
dev1.set("dpp_configurator_params",
@ -6154,8 +6157,6 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False):
wait_auth_success(dev1, dev0, configurator=dev1, enrollee=dev0,
allow_enrollee_failure=True,
allow_configurator_failure=True)
time.sleep(0.5)
wt.close()
def test_dpp_tcp_conf_init(dev, apdev, params):
"""DPP over TCP (Configurator initiates)"""
@ -6175,9 +6176,10 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
check_dpp_capab(dev0, min_ver=2)
check_dpp_capab(dev1, min_ver=2)
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
with WlantestCapture('lo', cap_lo):
run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port, conf)
def run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
id_c = dev1.dpp_bootstrap_gen()
uri_c = dev1.request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
res = dev1.request("DPP_BOOTSTRAP_INFO %d" % id_c)
@ -6194,8 +6196,6 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
wait_auth_success(dev1, dev0, configurator=dev0, enrollee=dev1,
allow_enrollee_failure=True,
allow_configurator_failure=True)
time.sleep(0.5)
wt.close()
def test_dpp_tcp_controller_management_hostapd(dev, apdev, params):
"""DPP Controller management in hostapd"""
@ -7570,9 +7570,10 @@ def run_dpp_enterprise_tcp(dev, apdev, params):
cap_lo = params['prefix'] + ".lo.pcap"
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
with WlantestCapture('lo', cap_lo) as wt:
_run_dpp_enterprise_tcp(dev, apdev, params, wt)
def _run_dpp_enterprise_tcp(dev, apdev, params, wt):
# Controller
conf_id = dev[1].dpp_configurator_add()
csrattrs = "MAsGCSqGSIb3DQEJBw=="
@ -7641,9 +7642,6 @@ def run_dpp_enterprise_tcp_end(params, dev, wt):
if "DPP-CONF-RECEIVED" not in ev:
raise Exception("DPP configuration did not succeed (Enrollee)")
time.sleep(0.5)
wt.close()
def test_dpp_enterprise_tcp2(dev, apdev, params):
"""DPP over TCP for enterprise provisioning (Controller initiating)"""
if not openssl_imported:
@ -7660,21 +7658,11 @@ def run_dpp_enterprise_tcp2(dev, apdev, params):
check_dpp_capab(dev[1])
cap_lo = params['prefix'] + ".lo.pcap"
cert_file = params['prefix'] + ".cert.pem"
pkcs7_file = params['prefix'] + ".pkcs7.der"
with open("auth_serv/ec-ca.pem", "rb") as f:
res = f.read()
cacert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
res)
with open("auth_serv/ec-ca.key", "rb") as f:
res = f.read()
cakey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, res)
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
with WlantestCapture('lo', cap_lo) as wt:
_run_dpp_enterprise_tcp2(dev, apdev, params, wt)
def _run_dpp_enterprise_tcp2(dev, apdev, params, wt):
# Client/Enrollee/Responder
id_e = dev[0].dpp_bootstrap_gen()
uri_e = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id_e)

View file

@ -59,9 +59,10 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False):
check_dpp_capab(dev0, min_ver=3)
check_dpp_capab(dev1, min_ver=3)
wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
with WlantestCapture('lo', cap_lo):
run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae, status)
def run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae=False, status=False):
# Controller
if sae:
ssid = binascii.hexlify("sae".encode()).decode()
@ -99,9 +100,6 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False):
if 'wait_conn_status' not in res or not res['wait_conn_status']:
raise Exception("wait_conn_status not reported")
time.sleep(0.5)
wt.close()
def test_dpp_tcp_pkex(dev, apdev, params):
"""DPP/PKEXv2 over TCP"""
prefix = "dpp_tcp_pkex"
@ -221,8 +219,10 @@ def run_dpp_controller_relay_pkex(dev, apdev, params):
prefix = "dpp_controller_relay_pkex"
cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
wt = WlantestCapture('lo', cap_lo)
with WlantestCapture('lo', cap_lo):
run_dpp_controller_relay_pkex2(dev, apdev, params)
def run_dpp_controller_relay_pkex2(dev, apdev, params):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
@ -277,9 +277,6 @@ def run_dpp_controller_relay_pkex(dev, apdev, params):
dev[0].wait_connected()
dev[0].dump_monitor()
time.sleep(0.5)
wt.close()
def dpp_pb_ap(apdev):
params = {"ssid": "sae",
"dpp_configurator_connectivity": "1",

View file

@ -259,12 +259,24 @@ class WlantestCapture:
self.cmd = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(1)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
time.sleep(0.5)
self.close()
time.sleep(0.5)
def __del__(self):
if self.cmd:
print("WlantestCapture.__del__ needed to run close()")
self.close()
def close(self):
if not self.cmd:
return
logger.debug("wlantest[%s] stopping" % self.ifname)
self.cmd.terminate()
res = self.cmd.communicate()