tests: sigma_dut and DPP push button first on Enrollee

Signed-off-by: Jouni Malinen <quic_jouni@quicinc.com>
This commit is contained in:
Jouni Malinen 2022-08-23 18:56:17 +03:00 committed by Jouni Malinen
parent d72302c6b6
commit d68946d510

View file

@ -59,32 +59,49 @@ def sigma_log_output(cmd):
sigma_prog = None
def sigma_dut_cmd(cmd, port=9000, timeout=2):
def sigma_dut_cmd(cmd, port=9000, timeout=2, dump_dev=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
socket.IPPROTO_TCP)
sock.settimeout(timeout)
sock.settimeout(1 if dump_dev else timeout)
addr = ('127.0.0.1', port)
sock.connect(addr)
sock.send(cmd.encode() + b"\r\n")
try:
res = sock.recv(1000).decode()
running = False
done = False
for line in res.splitlines():
if line.startswith("status,RUNNING"):
running = True
elif line.startswith("status,INVALID") or \
line.startswith("status,ERROR") or \
line.startswith("status,COMPLETE"):
done = True
res = line
break
if running and not done:
# Read the actual response
running = False
done = False
if dump_dev:
for i in range(timeout):
dump_dev.dump_monitor()
try:
res = sock.recv(1000).decode()
for line in res.splitlines():
if line.startswith("status,RUNNING"):
running = True
elif line.startswith("status,INVALID") or \
line.startswith("status,ERROR") or \
line.startswith("status,COMPLETE"):
done = True
res = line
break
except socket.timeout as e:
pass
if (not dump_dev) or (running and not done):
try:
res = sock.recv(1000).decode()
except:
res = ''
pass
for line in res.splitlines():
if line.startswith("status,RUNNING"):
running = True
elif line.startswith("status,INVALID") or \
line.startswith("status,ERROR") or \
line.startswith("status,COMPLETE"):
done = True
res = line
break
if running and not done:
# Read the actual response
res = sock.recv(1000).decode()
except:
res = ''
pass
sock.close()
res = res.rstrip()
logger.debug("sigma_dut: '%s' --> '%s'" % (cmd, res))
@ -4195,6 +4212,44 @@ def test_sigma_dut_dpp_pb_sta(dev, apdev):
stop_sigma_dut(sigma)
dev[0].set("dpp_config_processing", "0")
def dpp_ap_pb_delayed_start(hapd):
time.sleep(10)
if "OK" not in hapd.request("DPP_PUSH_BUTTON"):
raise Exception("Failed to press push button on the AP")
def test_sigma_dut_dpp_pb_sta_first(dev, apdev):
"""sigma_dut DPP/PB station first"""
check_dpp_capab(dev[0], min_ver=3)
check_sae_capab(dev[0])
params = {"ssid": "sae",
"wpa": "2",
"wpa_key_mgmt": "SAE",
"ieee80211w": "2",
"rsn_pairwise": "CCMP",
"sae_password": "sae-password"}
hapd = hostapd.add_ap(apdev[0], params)
ifname = dev[0].ifname
sigma = start_sigma_dut(ifname)
try:
t = threading.Thread(target=dpp_ap_pb_delayed_start, args=(hapd,))
t.start()
sigma_dut_cmd_check("sta_reset_default,interface,%s,prog,DPP" % ifname)
cmd = "dev_exec_action,program,DPP,DPPActionType,AutomaticDPP,DPPAuthRole,Responder,DPPProvisioningRole,Enrollee,DPPBS,PBBS,DPPTimeout,50,DPPWaitForConnect,Yes"
res = sigma_dut_cmd(cmd, timeout=60, dump_dev=dev[0])
t.join()
if "BootstrapResult,OK,AuthResult,OK,ConfResult,OK,NetworkConnectResult,OK" not in res:
raise Exception("Unexpected result: " + res)
ev = hapd.wait_event(["DPP-PB-RESULT"], timeout=1)
if ev is None or "success" not in ev:
raise Exception("Push button bootstrapping did not succeed on AP")
finally:
stop_sigma_dut(sigma)
dev[0].set("dpp_config_processing", "0")
def dpp_ap_pb_overlap(hapd, hapd2, dev0):
if "OK" not in hapd.request("DPP_PUSH_BUTTON"):
raise Exception("Failed to press push button on the AP")