tests: Replace str.encode('hex') with binascii.hexlify() for python3

Signed-off-by: Masashi Honma <masashi.honma@gmail.com>
Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Masashi Honma 2019-02-02 18:19:35 +02:00 committed by Jouni Malinen
parent 1c48c9bcf9
commit 54c58f29c0
12 changed files with 111 additions and 81 deletions

View file

@ -2047,13 +2047,13 @@ def test_ap_wpa2_eap_tls_blob(dev, apdev):
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
hapd = hostapd.add_ap(apdev[0], params)
cert = read_pem("auth_serv/ca.pem")
if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")):
if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()):
raise Exception("Could not set cacert blob")
cert = read_pem("auth_serv/user.pem")
if "OK" not in dev[0].request("SET blob usercert " + cert.encode("hex")):
if "OK" not in dev[0].request("SET blob usercert " + binascii.hexlify(cert).decode()):
raise Exception("Could not set usercert blob")
key = read_pem("auth_serv/user.rsa-key")
if "OK" not in dev[0].request("SET blob userkey " + key.encode("hex")):
if "OK" not in dev[0].request("SET blob userkey " + binascii.hexlify(key).decode()):
raise Exception("Could not set cacert blob")
eap_connect(dev[0], hapd, "TLS", "tls user", ca_cert="blob://cacert",
client_cert="blob://usercert",
@ -2128,10 +2128,10 @@ def test_ap_wpa2_eap_tls_pkcs12_blob(dev, apdev):
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
hapd = hostapd.add_ap(apdev[0], params)
cert = read_pem("auth_serv/ca.pem")
if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")):
if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()):
raise Exception("Could not set cacert blob")
with open("auth_serv/user.pkcs12", "rb") as f:
if "OK" not in dev[0].request("SET blob pkcs12 " + f.read().encode("hex")):
if "OK" not in dev[0].request("SET blob pkcs12 " + binascii.hexlify(f.read()).decode()):
raise Exception("Could not set pkcs12 blob")
eap_connect(dev[0], hapd, "TLS", "tls user", ca_cert="blob://cacert",
private_key="blob://pkcs12",
@ -2143,7 +2143,7 @@ def test_ap_wpa2_eap_tls_neg_incorrect_trust_root(dev, apdev):
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
hostapd.add_ap(apdev[0], params)
cert = read_pem("auth_serv/ca-incorrect.pem")
if "OK" not in dev[0].request("SET blob cacert " + cert.encode("hex")):
if "OK" not in dev[0].request("SET blob cacert " + binascii.hexlify(cert).decode()):
raise Exception("Could not set cacert blob")
dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS",
identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
@ -3421,7 +3421,7 @@ def test_ap_wpa2_eap_fast_text_pac_errors(dev, apdev):
pac += "START\n"
pac += "PAC-Type\n"
pac += "END\n"
if "OK" not in dev[0].request("SET blob fast_pac_text_errors " + pac.encode("hex")):
if "OK" not in dev[0].request("SET blob fast_pac_text_errors " + binascii.hexlify(pac.encode()).decode()):
raise Exception("Could not set blob")
dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="FAST",
@ -4760,7 +4760,7 @@ def test_ap_wpa2_eap_ttls_dh_params_blob(dev, apdev):
params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
hapd = hostapd.add_ap(apdev[0], params)
dh = read_pem("auth_serv/dh2.conf")
if "OK" not in dev[0].request("SET blob dhparams " + dh.encode("hex")):
if "OK" not in dev[0].request("SET blob dhparams " + binascii.hexlify(dh).decode()):
raise Exception("Could not set dhparams blob")
eap_connect(dev[0], hapd, "TTLS", "pap user",
anonymous_identity="ttls", password="password",

View file

@ -2417,9 +2417,9 @@ def test_ap_ft_reassoc_proto(dev, apdev):
# FT: R0KH-ID in FTIE did not match with the current R0KH-ID
tests += [ rsne + mde + "3755" + fte[4:168] + "0301ff" ]
# FT: No R1KH-ID subelem in FTIE
tests += [ rsne + mde + "375e" + fte[4:168] + "030a" + "nas1.w1.fi".encode("hex") ]
tests += [ rsne + mde + "375e" + fte[4:168] + "030a" + binascii.hexlify(b"nas1.w1.fi").decode() ]
# FT: Unknown R1KH-ID used in ReassocReq
tests += [ rsne + mde + "3766" + fte[4:168] + "030a" + "nas1.w1.fi".encode("hex") + "0106000000000000" ]
tests += [ rsne + mde + "3766" + fte[4:168] + "030a" + binascii.hexlify(b"nas1.w1.fi").decode() + "0106000000000000" ]
# FT: PMKID in Reassoc Req did not match with the PMKR1Name derived from auth request
tests += [ rsne[:-32] + 16*"00" + mde + fte ]
# Invalid MIC in FTIE

View file

@ -351,13 +351,13 @@ def test_ap_nai_home_realm_query(dev, apdev):
if len(nai1) >= len(nai2):
raise Exception("Unexpected NAI Realm list response lengths")
if "example.com".encode('hex') not in nai1:
if binascii.hexlify(b"example.com").decode() not in nai1:
raise Exception("Home realm not reported")
if "example.org".encode('hex') in nai1:
if binascii.hexlify(b"example.org").decode() in nai1:
raise Exception("Non-home realm reported")
if "example.com".encode('hex') not in nai2:
if binascii.hexlify(b"example.com").decode() not in nai2:
raise Exception("Home realm not reported in wildcard query")
if "example.org".encode('hex') not in nai2:
if binascii.hexlify(b"example.org").decode() not in nai2:
raise Exception("Non-home realm not reported in wildcard query ")
cmds = [ "foo",

View file

@ -177,7 +177,7 @@ def test_ap_wps_init_through_wps_config(dev, apdev):
ssid = "test-wps-init-config"
hapd = hostapd.add_ap(apdev[0],
{ "ssid": ssid, "eap_server": "1", "wps_state": "1" })
if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"12345678").decode()):
raise Exception("WPS_CONFIG command failed")
ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
if ev is None:
@ -200,7 +200,7 @@ def test_ap_wps_init_through_wps_config_2(dev, apdev):
hapd = hostapd.add_ap(apdev[0],
{ "ssid": ssid, "eap_server": "1", "wps_state": "1",
"wps_cred_processing": "2" })
if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"12345678").decode()):
raise Exception("WPS_CONFIG command failed")
ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
if ev is None:
@ -214,7 +214,7 @@ def test_ap_wps_invalid_wps_config_passphrase(dev, apdev):
ssid = "test-wps-init-config"
hapd = hostapd.add_ap(apdev[0],
{ "ssid": ssid, "eap_server": "1", "wps_state": "1" })
if "FAIL" not in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "1234567".encode("hex")):
if "FAIL" not in hapd.request("WPS_CONFIG " + binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(b"1234567").decode()):
raise Exception("Invalid WPS_CONFIG command accepted")
def test_ap_wps_conf(dev, apdev):
@ -771,7 +771,7 @@ def test_ap_wps_reg_config_ext_processing(dev, apdev):
if "1026" not in ev:
raise Exception("AP Settings missing from event")
hapd.request("SET wps_cred_processing 0")
if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
if "FAIL" in hapd.request("WPS_CONFIG " + binascii.hexlify(new_ssid.encode()).decode() + " WPA2PSK CCMP " + binascii.hexlify(new_passphrase.encode()).decode()):
raise Exception("WPS_CONFIG command failed")
dev[0].wait_connected(timeout=15)
@ -1572,8 +1572,8 @@ def _test_ap_wps_er_config_ap(dev, apdev):
raise Exception("Expected AP UUID not found")
new_passphrase = "1234567890"
dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
ssid.encode("hex") + " WPA2PSK CCMP " +
new_passphrase.encode("hex"))
binascii.hexlify(ssid.encode()).decode() + " WPA2PSK CCMP " +
binascii.hexlify(new_passphrase.encode()).decode())
ev = dev[0].wait_event(["WPS-SUCCESS"])
if ev is None:
raise Exception("WPS ER configuration operation timed out")

View file

@ -1115,7 +1115,7 @@ def test_dpp_config_fragmentation(dev, apdev):
def test_dpp_config_legacy_gen(dev, apdev):
"""Generate DPP Config Object for legacy network"""
run_dpp_qr_code_auth_unicast(dev, apdev, "prime256v1",
init_extra="conf=sta-psk pass=%s" % "passphrase".encode("hex"),
init_extra="conf=sta-psk pass=%s" % binascii.hexlify(b"passphrase").decode(),
require_conf_success=True)
def test_dpp_config_legacy_gen_psk(dev, apdev):
@ -1707,12 +1707,14 @@ def test_dpp_gas_timeout(dev, apdev):
# DPP Authentication Request
msg = dev[0].mgmt_rx()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
# DPP Authentication Confirmation
msg = dev[0].mgmt_rx()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[0].wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
@ -1724,7 +1726,8 @@ def test_dpp_gas_timeout(dev, apdev):
# DPP Configuration Response (GAS Initial Response frame)
msg = dev[0].mgmt_rx()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
# GAS Comeback Response frame
@ -2266,7 +2269,9 @@ def run_dpp_auto_connect_legacy(dev, apdev, conf='sta-psk',
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
cmd = "DPP_AUTH_INIT peer=%d conf=%s ssid=%s pass=%s" % (id1, conf, "dpp-legacy".encode("hex"), "secret passphrase".encode("hex"))
cmd = "DPP_AUTH_INIT peer=%d conf=%s ssid=%s pass=%s" % (id1, conf,
binascii.hexlify(b"dpp-legacy").decode(),
binascii.hexlify(b"secret passphrase").decode())
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to initiate DPP Authentication")
ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=10)
@ -2317,7 +2322,9 @@ def run_dpp_auto_connect_legacy_pmf_required(dev, apdev):
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s pass=%s" % (id1, "dpp-legacy".encode("hex"), "secret passphrase".encode("hex"))
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s pass=%s" % (id1,
binascii.hexlify(b"dpp-legacy").decode(),
binascii.hexlify(b"secret passphrase").decode())
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to initiate DPP Authentication")
ev = dev[1].wait_event(["DPP-CONF-SENT"], timeout=10)
@ -5149,7 +5156,8 @@ def test_dpp_keygen_configurator_error(dev, apdev):
def rx_process_frame(dev):
msg = dev.mgmt_rx()
if "OK" not in dev.request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], msg['frame'].encode('hex'))):
if "OK" not in dev.request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
msg['freq'], msg['datarate'], msg['ssi_signal'], binascii.hexlify(msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
def wait_auth_success(responder, initiator):
@ -5238,7 +5246,7 @@ def test_dpp_gas_comeback_after_failure(dev, apdev):
# DPP Configuration Request (GAS Comeback Request frame)
msg = dev[0].mgmt_rx()
frame = msg['frame'].encode('hex')
frame = binascii.hexlify(msg['frame']).decode()
with alloc_fail(dev[0], 1, "gas_build_comeback_resp;gas_server_handle_rx_comeback_req"):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
raise Exception("MGMT_RX_PROCESS failed")
@ -5269,26 +5277,29 @@ def test_dpp_gas(dev, apdev):
msg = dev[0].mgmt_rx()
# Protected Dual of GAS Initial Request frame (dropped by GAS server)
frame = msg['frame'].encode('hex')
frame = frame[0:48] + "09" + frame[50:]
if msg == None:
raise Exception("MGMT_RX_PROCESS failed. <Please retry>")
frame = binascii.hexlify(msg['frame'])
frame = frame[0:48] + b"09" + frame[50:]
frame = frame.decode()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
raise Exception("MGMT_RX_PROCESS failed")
with alloc_fail(dev[0], 1, "gas_server_send_resp"):
frame = msg['frame'].encode('hex')
frame = binascii.hexlify(msg['frame']).decode()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
raise Exception("MGMT_RX_PROCESS failed")
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
with alloc_fail(dev[0], 1, "gas_build_initial_resp;gas_server_send_resp"):
frame = msg['frame'].encode('hex')
frame = binascii.hexlify(msg['frame']).decode()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
raise Exception("MGMT_RX_PROCESS failed")
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
# Add extra data after Query Request field to trigger
# "GAS: Ignored extra data after Query Request field"
frame = msg['frame'].encode('hex') + "00"
frame = binascii.hexlify(msg['frame']).decode() + "00"
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
raise Exception("MGMT_RX_PROCESS failed")
@ -5314,7 +5325,7 @@ def test_dpp_truncated_attr(dev, apdev):
frame = msg['frame']
# DPP: Truncated message - not enough room for the attribute - dropped
frame1 = frame[0:36].encode('hex')
frame1 = binascii.hexlify(frame[0:36]).decode()
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame1)):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[0].wait_event(["DPP-RX"], timeout=5)
@ -5322,7 +5333,7 @@ def test_dpp_truncated_attr(dev, apdev):
raise Exception("Invalid attribute error not reported")
# DPP: Unexpected octets (3) after the last attribute
frame2 = frame.encode('hex') + "000000"
frame2 = binascii.hexlify(frame).decode() + "000000"
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame2)):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[0].wait_event(["DPP-RX"], timeout=5)
@ -5425,7 +5436,7 @@ def test_dpp_invalid_legacy_params(dev, apdev):
id1 = int(res)
# No pass/psk
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s" % (id1, "dpp-legacy".encode("hex"))
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk ssid=%s" % (id1, binascii.hexlify(b"dpp-legacy").decode())
if "FAIL" not in dev[1].request(cmd):
raise Exception("Invalid command not rejected")
@ -5448,7 +5459,7 @@ def test_dpp_invalid_legacy_params2(dev, apdev):
id1 = int(res)
dev[0].set("dpp_configurator_params",
" conf=sta-psk ssid=%s" % ("dpp-legacy".encode("hex")))
" conf=sta-psk ssid=%s" % (binascii.hexlify(b"dpp-legacy").decode()))
cmd = "DPP_LISTEN 2412 role=configurator"
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
@ -5483,7 +5494,9 @@ def test_dpp_legacy_params_failure(dev, apdev):
if "OK" not in dev[0].request("DPP_LISTEN 2412"):
raise Exception("Failed to start listen operation")
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk pass=%s ssid=%s" % (id1, "passphrase".encode("hex"), "dpp-legacy".encode("hex"))
cmd = "DPP_AUTH_INIT peer=%d conf=sta-psk pass=%s ssid=%s" % (id1,
binascii.hexlify(b"passphrase").decode(),
binascii.hexlify(b"dpp-legacy").decode())
with alloc_fail(dev[1], 1, "dpp_build_conf_obj_legacy"):
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to initiate DPP")
@ -5819,7 +5832,7 @@ def run_dpp_network_addition_failure(dev, apdev):
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
dev[0].dump_monitor()
cmd = "DPP_CONFIGURATOR_SIGN conf=sta-psk pass=%s configurator=%d" % ("passphrase".encode("hex"), conf_id)
cmd = "DPP_CONFIGURATOR_SIGN conf=sta-psk pass=%s configurator=%d" % (binascii.hexlify(b"passphrase").decode(), conf_id)
tests = [ (1, "wpa_config_set_quoted;wpas_dpp_add_network") ]
for count,func in tests:
with alloc_fail(dev[0], count, func):

View file

@ -5,6 +5,7 @@
# See README for more details.
from remotehost import remote_compatible
import binascii
import time
import subprocess
import logging
@ -154,7 +155,10 @@ def test_nfc_wps_password_token_ap(dev, apdev):
dev[0].dump_monitor()
new_ssid = "test-wps-nfc-pw-token-new-ssid"
new_passphrase = "1234567890"
res = dev[0].request("WPS_REG " + apdev[0]['bssid'] + " nfc-pw " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex"))
res = dev[0].request("WPS_REG " + apdev[0]['bssid'] + " nfc-pw " +
binascii.hexlify(new_ssid.encode()).decode() +
" WPA2PSK CCMP " +
binascii.hexlify(new_passphrase.encode()).decode())
if "FAIL" in res:
raise Exception("Failed to start Registrar using NFC password token")
dev[0].wait_connected(timeout=30)

View file

@ -1083,7 +1083,7 @@ def test_rrm_beacon_req_table_ssid(dev, apdev):
addr = dev[0].own_addr()
bssid2 = hapd2.own_addr()
token = run_req_beacon(hapd, addr, "51000000000002ffffffffffff" + "0007" + "another".encode('hex'))
token = run_req_beacon(hapd, addr, "51000000000002ffffffffffff" + "0007" + binascii.hexlify(b"another").decode())
ev = hapd.wait_event(["BEACON-RESP-RX"], timeout=10)
if ev is None:
raise Exception("Beacon report response not received")

View file

@ -726,7 +726,7 @@ def test_sae_proto_confirm_replay(dev, apdev):
hdr = "b0003a01" + bssid + addr + bssid + "1000"
hapd.dump_monitor()
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + req['frame'].encode('hex'))
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
logger.info("Confirm")
for i in range(0, 10):
@ -740,10 +740,10 @@ def test_sae_proto_confirm_replay(dev, apdev):
raise Exception("Authentication frame (confirm) not received")
hapd.dump_monitor()
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + req['frame'].encode('hex'))
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
logger.info("Replay Confirm")
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + req['frame'].encode('hex'))
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
logger.info("Association Request")
for i in range(0, 10):
@ -757,7 +757,7 @@ def test_sae_proto_confirm_replay(dev, apdev):
raise Exception("Association Request frame not received")
hapd.dump_monitor()
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + req['frame'].encode('hex'))
hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + binascii.hexlify(req['frame']).decode())
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("Management frame TX status not reported (1)")

View file

@ -27,6 +27,9 @@ def check_sigma_dut():
if not os.path.exists("./sigma_dut"):
raise HwsimSkip("sigma_dut not available")
def to_hex(s):
return binascii.hexlify(s.encode()).decode()
def sigma_dut_cmd(cmd, port=9000, timeout=2):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
socket.IPPROTO_TCP)
@ -1027,12 +1030,12 @@ def test_sigma_dut_dpp_qr_init_enrollee(dev, apdev):
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
dev[1].set("dpp_configurator_params",
" conf=sta-dpp ssid=%s configurator=%d" % ("DPPNET01".encode("hex"), conf_id))
" conf=sta-dpp ssid=%s configurator=%d" % (to_hex("DPPNET01"), conf_id))
cmd = "DPP_LISTEN 2437 role=configurator"
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1093,7 +1096,7 @@ def run_sigma_dut_dpp_qr_mutual_init_enrollee_check(dev, apdev, extra=''):
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
dev[1].set("dpp_configurator_params",
" conf=sta-dpp ssid=%s configurator=%d" % ("DPPNET01".encode("hex"), conf_id))
" conf=sta-dpp ssid=%s configurator=%d" % (to_hex("DPPNET01"), conf_id))
cmd = "DPP_LISTEN 2437 role=configurator qr=mutual"
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
@ -1110,7 +1113,7 @@ def run_sigma_dut_dpp_qr_mutual_init_enrollee_check(dev, apdev, extra=''):
raise Exception("Failed to parse QR Code URI")
id1 = int(res)
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1124,7 +1127,7 @@ def run_sigma_dut_dpp_qr_mutual_init_enrollee_check(dev, apdev, extra=''):
def dpp_init_conf_mutual(dev, id1, conf_id, own_id=None):
time.sleep(1)
logger.info("Starting DPP initiator/configurator in a thread")
cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp ssid=%s configurator=%d" % (id1, "DPPNET01".encode("hex"), conf_id)
cmd = "DPP_AUTH_INIT peer=%d conf=sta-dpp ssid=%s configurator=%d" % (id1, to_hex("DPPNET01"), conf_id)
if own_id is not None:
cmd += " own=%d" % own_id
if "OK" not in dev.request(cmd):
@ -1194,7 +1197,7 @@ def run_sigma_dut_dpp_qr_mutual_resp_enrollee(dev, apdev, extra=None):
raise Exception("Failed to parse QR Code URI")
id1 = int(res)
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1216,7 +1219,8 @@ def run_sigma_dut_dpp_qr_mutual_resp_enrollee(dev, apdev, extra=None):
def dpp_resp_conf_mutual(dev, conf_id, uri):
logger.info("Starting DPP responder/configurator in a thread")
dev.set("dpp_configurator_params",
" conf=sta-dpp ssid=%s configurator=%d" % ("DPPNET01".encode("hex"), conf_id))
" conf=sta-dpp ssid=%s configurator=%d" % (to_hex("DPPNET01"),
conf_id))
cmd = "DPP_LISTEN 2437 role=configurator qr=mutual"
if "OK" not in dev.request(cmd):
raise Exception("Failed to initiate DPP listen")
@ -1293,7 +1297,7 @@ def run_sigma_dut_dpp_qr_mutual_init_enrollee(dev, apdev, resp_pending):
raise Exception("Failed to parse QR Code URI")
uri = None
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1339,12 +1343,12 @@ def test_sigma_dut_dpp_qr_init_enrollee_psk(dev, apdev):
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
dev[1].set("dpp_configurator_params",
" conf=sta-psk ssid=%s pass=%s configurator=%d" % ("DPPNET01".encode("hex"), "ThisIsDppPassphrase".encode("hex"), conf_id))
" conf=sta-psk ssid=%s pass=%s configurator=%d" % (to_hex("DPPNET01"), to_hex("ThisIsDppPassphrase"), conf_id))
cmd = "DPP_LISTEN 2437 role=configurator"
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1387,12 +1391,12 @@ def test_sigma_dut_dpp_qr_init_enrollee_sae(dev, apdev):
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
dev[1].set("dpp_configurator_params",
" conf=sta-sae ssid=%s pass=%s configurator=%d" % ("DPPNET01".encode("hex"), "ThisIsDppPassphrase".encode("hex"), conf_id))
" conf=sta-sae ssid=%s pass=%s configurator=%d" % (to_hex("DPPNET01"), to_hex("ThisIsDppPassphrase"), conf_id))
cmd = "DPP_LISTEN 2437 role=configurator"
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1458,7 +1462,7 @@ def run_sigma_dut_dpp_qr_init_configurator(dev, apdev, conf_idx,
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1501,7 +1505,7 @@ def test_sigma_dut_dpp_incompatible_roles_init(dev, apdev):
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1551,7 +1555,7 @@ def test_sigma_dut_dpp_incompatible_roles_resp(dev, apdev):
id0 = int(res)
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1607,7 +1611,7 @@ def test_sigma_dut_ap_dpp_qr(dev, apdev, params):
def test_sigma_dut_ap_dpp_qr_legacy(dev, apdev, params):
"""sigma_dut controlled AP (legacy)"""
run_sigma_dut_ap_dpp_qr(dev, apdev, params, "ap-psk", "sta-psk",
extra="pass=%s" % "qwertyuiop".encode("hex"))
extra="pass=%s" % to_hex("qwertyuiop"))
def test_sigma_dut_ap_dpp_qr_legacy_psk(dev, apdev, params):
"""sigma_dut controlled AP (legacy)"""
@ -1804,7 +1808,7 @@ def run_sigma_dut_dpp_proto_initiator(dev, step, frame, attr, result, fail):
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1901,7 +1905,7 @@ def run_sigma_dut_dpp_proto_stop_at_initiator(dev, frame, result, fail):
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -1948,7 +1952,7 @@ def run_sigma_dut_dpp_proto_stop_at_initiator_enrollee(dev, frame, result,
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -2159,12 +2163,13 @@ def init_sigma_dut_dpp_proto_peer_disc_req(dev, apdev):
uri0 = dev[1].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
dev[1].set("dpp_configurator_params",
" conf=sta-dpp ssid=%s configurator=%d" % ("DPPNET01".encode("hex"), conf_id))
" conf=sta-dpp ssid=%s configurator=%d" % (to_hex("DPPNET01"),
conf_id))
cmd = "DPP_LISTEN 2437 role=configurator"
if "OK" not in dev[1].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri0.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri0))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -2199,7 +2204,7 @@ def test_sigma_dut_dpp_self_config(dev, apdev):
id = int(res)
uri = hapd.request("DPP_BOOTSTRAP_GET_URI %d" % id)
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
@ -2250,7 +2255,7 @@ def run_sigma_dut_ap_dpp_self_config(dev, apdev):
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % uri.encode('hex'))
res = sigma_dut_cmd("dev_exec_action,program,DPP,DPPActionType,SetPeerBootstrap,DPPBootstrappingdata,%s,DPPBS,QR" % to_hex(uri))
if "status,COMPLETE" not in res:
raise Exception("dev_exec_action did not succeed: " + res)
cmd = "dev_exec_action,program,DPP,DPPActionType,AutomaticDPP,DPPAuthRole,Initiator,DPPAuthDirection,Single,DPPProvisioningRole,Configurator,DPPConfIndex,1,DPPSigningKeyECC,P-256,DPPConfEnrolleeRole,STA,DPPBS,QR,DPPTimeout,6"

View file

@ -11,6 +11,7 @@ import os
import socket
import subprocess
import time
import binascii
import hostapd
import hwsim_utils
@ -872,7 +873,7 @@ def test_wpas_ctrl_disallow_aps(dev, apdev):
raise Exception("Unexpected BSSID")
dev[0].dump_monitor()
if "OK" not in dev[0].request("SET disallow_aps ssid " + "test".encode("hex")):
if "OK" not in dev[0].request("SET disallow_aps ssid " + binascii.hexlify(b"test").decode()):
raise Exception("Failed to set disallow_aps")
dev[0].wait_disconnected(timeout=5, error="Disconnection not seen")
ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=1)

View file

@ -11,6 +11,7 @@ import struct
import subprocess
import time
import json
import binascii
import hwsim_utils
import hostapd
@ -666,7 +667,8 @@ def test_wpas_mesh_secure_dropped_frame(dev, apdev):
if rx_msg['subtype'] == 13:
logger.info("Drop the first Action frame")
break
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
dev[0].request("SET ext_mgmt_frame_handling 0")
@ -1988,7 +1990,8 @@ def test_mesh_missing_mic(dev, apdev):
# Remove MIC
rx_msg['frame'] = frame[0:pos]
remove_mic = False
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
if ev:
@ -2057,7 +2060,8 @@ def test_mesh_pmkid_mismatch(dev, apdev):
# not match calculated PMKID)"
rx_msg['frame'] = frame[0:pos + 6] + b'\x00\x00\x00\x00' + frame[pos + 10:]
break_pmkid = False
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
if ev:
@ -2140,7 +2144,8 @@ def test_mesh_peering_proto(dev, apdev):
# "MPM: Mesh parsing rejected frame"
rx_msg['frame'] = frame[0:pos] + b'\x75\x00\x00\x00' + frame[pos + 6:]
test += 1
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.01)
if ev:
@ -2254,8 +2259,8 @@ def test_mesh_holding(dev, apdev):
if categ != 0x0f or action != 0x03:
raise Exception("Did not see Mesh Peering Close")
peer_lid = payload[-6:-4].encode("hex")
my_lid = payload[-4:-2].encode("hex")
peer_lid = binascii.hexlify(payload[-6:-4]).decode()
my_lid = binascii.hexlify(payload[-4:-2]).decode()
# Drop Mesh Peering Close and instead, process an unexpected Mesh Peering
# Open to trigger transmission of another Mesh Peering Close in the HOLDING
@ -2297,12 +2302,13 @@ def test_mesh_cnf_rcvd_event_cls_acpt(dev, apdev):
rx_msg = dev[0].mgmt_rx()
# Allow Mesh Peering Confirm to go through
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], rx_msg['frame'].encode('hex'))):
if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(
rx_msg['freq'], rx_msg['datarate'], rx_msg['ssi_signal'], binascii.hexlify(rx_msg['frame']).decode())):
raise Exception("MGMT_RX_PROCESS failed")
payload = rx_msg['payload']
peer_lid = payload[51:53].encode("hex")
my_lid = payload[53:55].encode("hex")
peer_lid = binascii.hexlify(payload[51:53]).decode()
my_lid = binascii.hexlify(payload[53:55]).decode()
dst = addr0.replace(':', '')
src = addr1.replace(':', '')
@ -2339,7 +2345,7 @@ def test_mesh_opn_snt_event_cls_acpt(dev, apdev):
payload = rx_msg['payload']
peer_lid = "0000"
my_lid = payload[53:55].encode("hex")
my_lid = binascii.hexlify(payload[53:55]).decode()
dst = addr0.replace(':', '')
src = addr1.replace(':', '')

View file

@ -1140,8 +1140,9 @@ class WpaSupplicant:
self.dump_monitor()
if new_ssid:
self.request("WPS_REG " + bssid + " " + pin + " " +
new_ssid.encode("hex") + " " + key_mgmt + " " +
cipher + " " + new_passphrase.encode("hex"))
binascii.hexlify(new_ssid.encode()).decode() + " " +
key_mgmt + " " + cipher + " " +
binascii.hexlify(new_passphrase.encode()).decode())
if no_wait:
return
ev = self.wait_event(["WPS-SUCCESS"], timeout=15)