tests: DPP/PKEX and local failures

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
This commit is contained in:
Jouni Malinen 2017-11-29 13:45:39 +02:00 committed by Jouni Malinen
parent 1866dfb52c
commit 818e3c94b7
2 changed files with 272 additions and 4 deletions

View file

@ -10,7 +10,7 @@ import time
import hostapd
import hwsim_utils
from utils import HwsimSkip
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
from wpasupplicant import WpaSupplicant
def check_dpp_capab(dev, brainpool=False):
@ -3458,3 +3458,271 @@ def test_dpp_qr_code_chan_list_no_match(dev, apdev):
cmd = "DPP_AUTH_INIT peer=%d" % id1
if "FAIL" not in dev[1].request(cmd):
raise Exception("DPP Authentication started unexpectedly")
def test_dpp_pkex_alloc_fail(dev, apdev):
"""DPP/PKEX and memory allocation failures"""
check_dpp_capab(dev[0])
check_dpp_capab(dev[1])
tests = [ (1, "=dpp_keygen_configurator"),
(1, "base64_gen_encode;dpp_keygen_configurator") ]
for count, func in tests:
with alloc_fail(dev[1], count, func):
cmd = "DPP_CONFIGURATOR_ADD"
res = dev[1].request(cmd);
if "FAIL" not in res:
raise Exception("Unexpected DPP_CONFIGURATOR_ADD success")
cmd = "DPP_CONFIGURATOR_ADD"
res = dev[1].request(cmd);
if "FAIL" in res:
raise Exception("Failed to add configurator")
conf_id = int(res)
cmd = "DPP_BOOTSTRAP_GEN type=pkex"
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
id0 = int(res)
cmd = "DPP_BOOTSTRAP_GEN type=pkex"
res = dev[1].request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
id1 = int(res)
# Local error cases on the Initiator
tests = [ (1, "dpp_get_pubkey_point"),
(1, "dpp_alloc_msg;dpp_pkex_build_exchange_req"),
(1, "dpp_alloc_msg;dpp_pkex_build_commit_reveal_req"),
(1, "dpp_alloc_msg;dpp_auth_build_req"),
(1, "dpp_alloc_msg;dpp_auth_build_conf"),
(1, "dpp_bootstrap_key_hash"),
(1, "dpp_auth_init"),
(1, "=dpp_auth_resp_rx"),
(2, "=dpp_auth_resp_rx"),
(1, "dpp_build_conf_start"),
(1, "dpp_build_conf_obj_dpp"),
(2, "dpp_build_conf_obj_dpp"),
(3, "dpp_build_conf_obj_dpp"),
(4, "dpp_build_conf_obj_dpp"),
(5, "dpp_build_conf_obj_dpp"),
(6, "dpp_build_conf_obj_dpp"),
(7, "dpp_build_conf_obj_dpp"),
(8, "dpp_build_conf_obj_dpp"),
(1, "dpp_conf_req_rx"),
(2, "dpp_conf_req_rx"),
(3, "dpp_conf_req_rx"),
(4, "dpp_conf_req_rx"),
(5, "dpp_conf_req_rx"),
(6, "dpp_conf_req_rx"),
(7, "dpp_conf_req_rx"),
(1, "dpp_pkex_init"),
(2, "dpp_pkex_init"),
(3, "dpp_pkex_init"),
(1, "dpp_pkex_derive_z"),
(1, "=dpp_pkex_rx_commit_reveal_resp"),
(1, "dpp_get_pubkey_point;dpp_build_jwk"),
(2, "dpp_get_pubkey_point;dpp_build_jwk"),
(1, "dpp_get_pubkey_point;dpp_auth_init") ]
for count, func in tests:
dev[0].request("DPP_STOP_LISTEN")
dev[1].request("DPP_STOP_LISTEN")
dev[0].dump_monitor()
dev[1].dump_monitor()
cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
cmd = "DPP_LISTEN 2437"
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
with alloc_fail(dev[1], count, func):
cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
dev[1].request(cmd)
wait_fail_trigger(dev[1], "GET_ALLOC_FAIL")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
if ev:
dev[0].request("DPP_STOP_LISTEN")
dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3)
# Local error cases on the Responder
tests = [ (1, "dpp_get_pubkey_point"),
(1, "dpp_alloc_msg;dpp_pkex_build_exchange_resp"),
(1, "dpp_alloc_msg;dpp_pkex_build_commit_reveal_resp"),
(1, "dpp_alloc_msg;dpp_auth_build_resp"),
(1, "dpp_get_pubkey_point;dpp_auth_build_resp_ok"),
(1, "=dpp_auth_req_rx"),
(2, "=dpp_auth_req_rx"),
(1, "=dpp_auth_conf_rx"),
(1, "json_parse;dpp_parse_jws_prot_hdr"),
(1, "json_get_member_base64url;dpp_parse_jws_prot_hdr"),
(1, "json_get_member_base64url;dpp_parse_jwk"),
(2, "json_get_member_base64url;dpp_parse_jwk"),
(1, "json_parse;dpp_parse_connector"),
(1, "dpp_parse_jwk;dpp_parse_connector"),
(1, "dpp_parse_jwk;dpp_parse_cred_dpp"),
(1, "dpp_get_pubkey_point;dpp_check_pubkey_match"),
(1, "base64_gen_decode;dpp_process_signed_connector"),
(1, "dpp_parse_jws_prot_hdr;dpp_process_signed_connector"),
(2, "base64_gen_decode;dpp_process_signed_connector"),
(3, "base64_gen_decode;dpp_process_signed_connector"),
(4, "base64_gen_decode;dpp_process_signed_connector"),
(1, "json_parse;dpp_parse_conf_obj"),
(1, "dpp_conf_resp_rx"),
(1, "=dpp_pkex_derive_z"),
(1, "=dpp_pkex_rx_exchange_req"),
(2, "=dpp_pkex_rx_exchange_req"),
(3, "=dpp_pkex_rx_exchange_req"),
(1, "=dpp_pkex_rx_commit_reveal_req"),
(1, "dpp_get_pubkey_point;dpp_pkex_rx_commit_reveal_req"),
(1, "dpp_bootstrap_key_hash") ]
for count, func in tests:
dev[0].request("DPP_STOP_LISTEN")
dev[1].request("DPP_STOP_LISTEN")
dev[0].dump_monitor()
dev[1].dump_monitor()
cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
cmd = "DPP_LISTEN 2437"
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
with alloc_fail(dev[0], count, func):
cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
dev[1].request(cmd)
wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
if ev:
dev[0].request("DPP_STOP_LISTEN")
dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3)
def test_dpp_pkex_test_fail(dev, apdev):
"""DPP/PKEX and local failures"""
check_dpp_capab(dev[0])
check_dpp_capab(dev[1])
tests = [ (1, "dpp_keygen_configurator") ]
for count, func in tests:
with fail_test(dev[1], count, func):
cmd = "DPP_CONFIGURATOR_ADD"
res = dev[1].request(cmd);
if "FAIL" not in res:
raise Exception("Unexpected DPP_CONFIGURATOR_ADD success")
tests = [ (1, "dpp_keygen") ]
for count, func in tests:
with fail_test(dev[1], count, func):
cmd = "DPP_BOOTSTRAP_GEN type=pkex"
res = dev[1].request(cmd);
if "FAIL" not in res:
raise Exception("Unexpected DPP_BOOTSTRAP_GEN success")
cmd = "DPP_CONFIGURATOR_ADD"
res = dev[1].request(cmd);
if "FAIL" in res:
raise Exception("Failed to add configurator")
conf_id = int(res)
cmd = "DPP_BOOTSTRAP_GEN type=pkex"
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
id0 = int(res)
cmd = "DPP_BOOTSTRAP_GEN type=pkex"
res = dev[1].request(cmd)
if "FAIL" in res:
raise Exception("Failed to generate bootstrapping info")
id1 = int(res)
# Local error cases on the Initiator
tests = [ (1, "aes_siv_encrypt;dpp_auth_build_req"),
(1, "os_get_random;dpp_auth_init"),
(1, "dpp_derive_k1;dpp_auth_init"),
(1, "dpp_hkdf_expand;dpp_derive_k1;dpp_auth_init"),
(1, "dpp_gen_i_auth;dpp_auth_build_conf"),
(1, "aes_siv_encrypt;dpp_auth_build_conf"),
(1, "dpp_derive_k2;dpp_auth_resp_rx"),
(1, "dpp_hkdf_expand;dpp_derive_k2;dpp_auth_resp_rx"),
(1, "dpp_derive_ke;dpp_auth_resp_rx"),
(1, "dpp_hkdf_expand;dpp_derive_ke;dpp_auth_resp_rx"),
(1, "dpp_gen_r_auth;dpp_auth_resp_rx"),
(1, "aes_siv_encrypt;dpp_build_conf_resp"),
(1, "dpp_pkex_derive_Qi;dpp_pkex_build_exchange_req"),
(1, "aes_siv_encrypt;dpp_pkex_build_commit_reveal_req"),
(1, "hmac_sha256_vector;dpp_pkex_rx_exchange_resp"),
(1, "aes_siv_decrypt;dpp_pkex_rx_commit_reveal_resp"),
(1, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_resp") ]
for count, func in tests:
dev[0].request("DPP_STOP_LISTEN")
dev[1].request("DPP_STOP_LISTEN")
dev[0].dump_monitor()
dev[1].dump_monitor()
cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
cmd = "DPP_LISTEN 2437"
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
with fail_test(dev[1], count, func):
cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
dev[1].request(cmd)
wait_fail_trigger(dev[1], "GET_FAIL")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
if ev:
dev[0].request("DPP_STOP_LISTEN")
dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3)
# Local error cases on the Responder
tests = [ (1, "aes_siv_encrypt;dpp_auth_build_resp"),
(1, "os_get_random;dpp_build_conf_req"),
(1, "aes_siv_encrypt;dpp_build_conf_req"),
(1, "os_get_random;dpp_auth_build_resp_ok"),
(1, "dpp_derive_k2;dpp_auth_build_resp_ok"),
(1, "dpp_derive_ke;dpp_auth_build_resp_ok"),
(1, "dpp_gen_r_auth;dpp_auth_build_resp_ok"),
(1, "aes_siv_encrypt;dpp_auth_build_resp_ok"),
(1, "dpp_derive_k1;dpp_auth_req_rx"),
(1, "aes_siv_decrypt;dpp_auth_req_rx"),
(1, "aes_siv_decrypt;dpp_auth_conf_rx"),
(1, "dpp_gen_i_auth;dpp_auth_conf_rx"),
(1, "dpp_check_pubkey_match"),
(1, "aes_siv_decrypt;dpp_conf_resp_rx"),
(1, "hmac_sha256_kdf;dpp_pkex_derive_z"),
(1, "dpp_pkex_derive_Qi;dpp_pkex_rx_exchange_req"),
(1, "dpp_pkex_derive_Qr;dpp_pkex_rx_exchange_req"),
(1, "aes_siv_encrypt;dpp_pkex_build_commit_reveal_resp"),
(1, "aes_siv_decrypt;dpp_pkex_rx_commit_reveal_req"),
(1, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_req"),
(2, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_req") ]
for count, func in tests:
dev[0].request("DPP_STOP_LISTEN")
dev[1].request("DPP_STOP_LISTEN")
dev[0].dump_monitor()
dev[1].dump_monitor()
cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0)
res = dev[0].request(cmd)
if "FAIL" in res:
raise Exception("Failed to set PKEX data (responder)")
cmd = "DPP_LISTEN 2437"
if "OK" not in dev[0].request(cmd):
raise Exception("Failed to start listen operation")
with fail_test(dev[0], count, func):
cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id)
dev[1].request(cmd)
wait_fail_trigger(dev[0], "GET_FAIL", max_iter=100)
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01)
if ev:
dev[0].request("DPP_STOP_LISTEN")
dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3)

View file

@ -54,11 +54,11 @@ class fail_test(object):
if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
raise Exception("Test failure did not trigger")
def wait_fail_trigger(dev, cmd, note="Failure not triggered"):
for i in range(0, 40):
def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40):
for i in range(0, max_iter):
if dev.request(cmd).startswith("0:"):
break
if i == 39:
if i == max_iter - 1:
raise Exception(note)
time.sleep(0.05)