diff --git a/tests/hwsim/test_dpp.py b/tests/hwsim/test_dpp.py index 5898d7747..1b689d684 100644 --- a/tests/hwsim/test_dpp.py +++ b/tests/hwsim/test_dpp.py @@ -10,7 +10,7 @@ import time import hostapd import hwsim_utils -from utils import HwsimSkip +from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger from wpasupplicant import WpaSupplicant def check_dpp_capab(dev, brainpool=False): @@ -3458,3 +3458,271 @@ def test_dpp_qr_code_chan_list_no_match(dev, apdev): cmd = "DPP_AUTH_INIT peer=%d" % id1 if "FAIL" not in dev[1].request(cmd): raise Exception("DPP Authentication started unexpectedly") + +def test_dpp_pkex_alloc_fail(dev, apdev): + """DPP/PKEX and memory allocation failures""" + check_dpp_capab(dev[0]) + check_dpp_capab(dev[1]) + + tests = [ (1, "=dpp_keygen_configurator"), + (1, "base64_gen_encode;dpp_keygen_configurator") ] + for count, func in tests: + with alloc_fail(dev[1], count, func): + cmd = "DPP_CONFIGURATOR_ADD" + res = dev[1].request(cmd); + if "FAIL" not in res: + raise Exception("Unexpected DPP_CONFIGURATOR_ADD success") + + cmd = "DPP_CONFIGURATOR_ADD" + res = dev[1].request(cmd); + if "FAIL" in res: + raise Exception("Failed to add configurator") + conf_id = int(res) + + cmd = "DPP_BOOTSTRAP_GEN type=pkex" + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to generate bootstrapping info") + id0 = int(res) + + cmd = "DPP_BOOTSTRAP_GEN type=pkex" + res = dev[1].request(cmd) + if "FAIL" in res: + raise Exception("Failed to generate bootstrapping info") + id1 = int(res) + + # Local error cases on the Initiator + tests = [ (1, "dpp_get_pubkey_point"), + (1, "dpp_alloc_msg;dpp_pkex_build_exchange_req"), + (1, "dpp_alloc_msg;dpp_pkex_build_commit_reveal_req"), + (1, "dpp_alloc_msg;dpp_auth_build_req"), + (1, "dpp_alloc_msg;dpp_auth_build_conf"), + (1, "dpp_bootstrap_key_hash"), + (1, "dpp_auth_init"), + (1, "=dpp_auth_resp_rx"), + (2, "=dpp_auth_resp_rx"), + (1, "dpp_build_conf_start"), + (1, "dpp_build_conf_obj_dpp"), + (2, "dpp_build_conf_obj_dpp"), + (3, "dpp_build_conf_obj_dpp"), + (4, "dpp_build_conf_obj_dpp"), + (5, "dpp_build_conf_obj_dpp"), + (6, "dpp_build_conf_obj_dpp"), + (7, "dpp_build_conf_obj_dpp"), + (8, "dpp_build_conf_obj_dpp"), + (1, "dpp_conf_req_rx"), + (2, "dpp_conf_req_rx"), + (3, "dpp_conf_req_rx"), + (4, "dpp_conf_req_rx"), + (5, "dpp_conf_req_rx"), + (6, "dpp_conf_req_rx"), + (7, "dpp_conf_req_rx"), + (1, "dpp_pkex_init"), + (2, "dpp_pkex_init"), + (3, "dpp_pkex_init"), + (1, "dpp_pkex_derive_z"), + (1, "=dpp_pkex_rx_commit_reveal_resp"), + (1, "dpp_get_pubkey_point;dpp_build_jwk"), + (2, "dpp_get_pubkey_point;dpp_build_jwk"), + (1, "dpp_get_pubkey_point;dpp_auth_init") ] + for count, func in tests: + dev[0].request("DPP_STOP_LISTEN") + dev[1].request("DPP_STOP_LISTEN") + dev[0].dump_monitor() + dev[1].dump_monitor() + + cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0) + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (responder)") + cmd = "DPP_LISTEN 2437" + if "OK" not in dev[0].request(cmd): + raise Exception("Failed to start listen operation") + + with alloc_fail(dev[1], count, func): + cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id) + dev[1].request(cmd) + wait_fail_trigger(dev[1], "GET_ALLOC_FAIL") + ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01) + if ev: + dev[0].request("DPP_STOP_LISTEN") + dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3) + + # Local error cases on the Responder + tests = [ (1, "dpp_get_pubkey_point"), + (1, "dpp_alloc_msg;dpp_pkex_build_exchange_resp"), + (1, "dpp_alloc_msg;dpp_pkex_build_commit_reveal_resp"), + (1, "dpp_alloc_msg;dpp_auth_build_resp"), + (1, "dpp_get_pubkey_point;dpp_auth_build_resp_ok"), + (1, "=dpp_auth_req_rx"), + (2, "=dpp_auth_req_rx"), + (1, "=dpp_auth_conf_rx"), + (1, "json_parse;dpp_parse_jws_prot_hdr"), + (1, "json_get_member_base64url;dpp_parse_jws_prot_hdr"), + (1, "json_get_member_base64url;dpp_parse_jwk"), + (2, "json_get_member_base64url;dpp_parse_jwk"), + (1, "json_parse;dpp_parse_connector"), + (1, "dpp_parse_jwk;dpp_parse_connector"), + (1, "dpp_parse_jwk;dpp_parse_cred_dpp"), + (1, "dpp_get_pubkey_point;dpp_check_pubkey_match"), + (1, "base64_gen_decode;dpp_process_signed_connector"), + (1, "dpp_parse_jws_prot_hdr;dpp_process_signed_connector"), + (2, "base64_gen_decode;dpp_process_signed_connector"), + (3, "base64_gen_decode;dpp_process_signed_connector"), + (4, "base64_gen_decode;dpp_process_signed_connector"), + (1, "json_parse;dpp_parse_conf_obj"), + (1, "dpp_conf_resp_rx"), + (1, "=dpp_pkex_derive_z"), + (1, "=dpp_pkex_rx_exchange_req"), + (2, "=dpp_pkex_rx_exchange_req"), + (3, "=dpp_pkex_rx_exchange_req"), + (1, "=dpp_pkex_rx_commit_reveal_req"), + (1, "dpp_get_pubkey_point;dpp_pkex_rx_commit_reveal_req"), + (1, "dpp_bootstrap_key_hash") ] + for count, func in tests: + dev[0].request("DPP_STOP_LISTEN") + dev[1].request("DPP_STOP_LISTEN") + dev[0].dump_monitor() + dev[1].dump_monitor() + + cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0) + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (responder)") + cmd = "DPP_LISTEN 2437" + if "OK" not in dev[0].request(cmd): + raise Exception("Failed to start listen operation") + + with alloc_fail(dev[0], count, func): + cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id) + dev[1].request(cmd) + wait_fail_trigger(dev[0], "GET_ALLOC_FAIL") + ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01) + if ev: + dev[0].request("DPP_STOP_LISTEN") + dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3) + +def test_dpp_pkex_test_fail(dev, apdev): + """DPP/PKEX and local failures""" + check_dpp_capab(dev[0]) + check_dpp_capab(dev[1]) + + tests = [ (1, "dpp_keygen_configurator") ] + for count, func in tests: + with fail_test(dev[1], count, func): + cmd = "DPP_CONFIGURATOR_ADD" + res = dev[1].request(cmd); + if "FAIL" not in res: + raise Exception("Unexpected DPP_CONFIGURATOR_ADD success") + + tests = [ (1, "dpp_keygen") ] + for count, func in tests: + with fail_test(dev[1], count, func): + cmd = "DPP_BOOTSTRAP_GEN type=pkex" + res = dev[1].request(cmd); + if "FAIL" not in res: + raise Exception("Unexpected DPP_BOOTSTRAP_GEN success") + + cmd = "DPP_CONFIGURATOR_ADD" + res = dev[1].request(cmd); + if "FAIL" in res: + raise Exception("Failed to add configurator") + conf_id = int(res) + + cmd = "DPP_BOOTSTRAP_GEN type=pkex" + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to generate bootstrapping info") + id0 = int(res) + + cmd = "DPP_BOOTSTRAP_GEN type=pkex" + res = dev[1].request(cmd) + if "FAIL" in res: + raise Exception("Failed to generate bootstrapping info") + id1 = int(res) + + # Local error cases on the Initiator + tests = [ (1, "aes_siv_encrypt;dpp_auth_build_req"), + (1, "os_get_random;dpp_auth_init"), + (1, "dpp_derive_k1;dpp_auth_init"), + (1, "dpp_hkdf_expand;dpp_derive_k1;dpp_auth_init"), + (1, "dpp_gen_i_auth;dpp_auth_build_conf"), + (1, "aes_siv_encrypt;dpp_auth_build_conf"), + (1, "dpp_derive_k2;dpp_auth_resp_rx"), + (1, "dpp_hkdf_expand;dpp_derive_k2;dpp_auth_resp_rx"), + (1, "dpp_derive_ke;dpp_auth_resp_rx"), + (1, "dpp_hkdf_expand;dpp_derive_ke;dpp_auth_resp_rx"), + (1, "dpp_gen_r_auth;dpp_auth_resp_rx"), + (1, "aes_siv_encrypt;dpp_build_conf_resp"), + (1, "dpp_pkex_derive_Qi;dpp_pkex_build_exchange_req"), + (1, "aes_siv_encrypt;dpp_pkex_build_commit_reveal_req"), + (1, "hmac_sha256_vector;dpp_pkex_rx_exchange_resp"), + (1, "aes_siv_decrypt;dpp_pkex_rx_commit_reveal_resp"), + (1, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_resp") ] + for count, func in tests: + dev[0].request("DPP_STOP_LISTEN") + dev[1].request("DPP_STOP_LISTEN") + dev[0].dump_monitor() + dev[1].dump_monitor() + + cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0) + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (responder)") + cmd = "DPP_LISTEN 2437" + if "OK" not in dev[0].request(cmd): + raise Exception("Failed to start listen operation") + + with fail_test(dev[1], count, func): + cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id) + dev[1].request(cmd) + wait_fail_trigger(dev[1], "GET_FAIL") + ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01) + if ev: + dev[0].request("DPP_STOP_LISTEN") + dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3) + + # Local error cases on the Responder + tests = [ (1, "aes_siv_encrypt;dpp_auth_build_resp"), + (1, "os_get_random;dpp_build_conf_req"), + (1, "aes_siv_encrypt;dpp_build_conf_req"), + (1, "os_get_random;dpp_auth_build_resp_ok"), + (1, "dpp_derive_k2;dpp_auth_build_resp_ok"), + (1, "dpp_derive_ke;dpp_auth_build_resp_ok"), + (1, "dpp_gen_r_auth;dpp_auth_build_resp_ok"), + (1, "aes_siv_encrypt;dpp_auth_build_resp_ok"), + (1, "dpp_derive_k1;dpp_auth_req_rx"), + (1, "aes_siv_decrypt;dpp_auth_req_rx"), + (1, "aes_siv_decrypt;dpp_auth_conf_rx"), + (1, "dpp_gen_i_auth;dpp_auth_conf_rx"), + (1, "dpp_check_pubkey_match"), + (1, "aes_siv_decrypt;dpp_conf_resp_rx"), + (1, "hmac_sha256_kdf;dpp_pkex_derive_z"), + (1, "dpp_pkex_derive_Qi;dpp_pkex_rx_exchange_req"), + (1, "dpp_pkex_derive_Qr;dpp_pkex_rx_exchange_req"), + (1, "aes_siv_encrypt;dpp_pkex_build_commit_reveal_resp"), + (1, "aes_siv_decrypt;dpp_pkex_rx_commit_reveal_req"), + (1, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_req"), + (2, "hmac_sha256_vector;dpp_pkex_rx_commit_reveal_req") ] + for count, func in tests: + dev[0].request("DPP_STOP_LISTEN") + dev[1].request("DPP_STOP_LISTEN") + dev[0].dump_monitor() + dev[1].dump_monitor() + + cmd = "DPP_PKEX_ADD own=%d identifier=test code=secret" % (id0) + res = dev[0].request(cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (responder)") + cmd = "DPP_LISTEN 2437" + if "OK" not in dev[0].request(cmd): + raise Exception("Failed to start listen operation") + + with fail_test(dev[0], count, func): + cmd = "DPP_PKEX_ADD own=%d identifier=test init=1 conf=sta-dpp configurator=%d code=secret" % (id1, conf_id) + dev[1].request(cmd) + wait_fail_trigger(dev[0], "GET_FAIL", max_iter=100) + ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=0.01) + if ev: + dev[0].request("DPP_STOP_LISTEN") + dev[0].wait_event(["GAS-QUERY-DONE"], timeout=3) diff --git a/tests/hwsim/utils.py b/tests/hwsim/utils.py index 69d3cff39..f19b41333 100644 --- a/tests/hwsim/utils.py +++ b/tests/hwsim/utils.py @@ -54,11 +54,11 @@ class fail_test(object): if self._dev.request("GET_FAIL") != "0:%s" % self._funcs: raise Exception("Test failure did not trigger") -def wait_fail_trigger(dev, cmd, note="Failure not triggered"): - for i in range(0, 40): +def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40): + for i in range(0, max_iter): if dev.request(cmd).startswith("0:"): break - if i == 39: + if i == max_iter - 1: raise Exception(note) time.sleep(0.05)