From b007bfdf683e7fada0c122f379e30572071e96e6 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Tue, 25 Jan 2022 00:35:58 +0200 Subject: [PATCH] tests: DPP PKEX over TCP Signed-off-by: Jouni Malinen --- tests/hwsim/test_dpp3.py | 122 ++++++++++++++++++++++++++++++++++- tests/hwsim/wpasupplicant.py | 7 +- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/tests/hwsim/test_dpp3.py b/tests/hwsim/test_dpp3.py index e50f199f3..4e5e808b9 100644 --- a/tests/hwsim/test_dpp3.py +++ b/tests/hwsim/test_dpp3.py @@ -4,7 +4,12 @@ # This software may be distributed under the terms of the BSD license. # See README for more details. -from test_dpp import check_dpp_capab, run_dpp_auto_connect +import os +import time + +import hostapd +from wlantest import WlantestCapture +from test_dpp import check_dpp_capab, run_dpp_auto_connect, wait_auth_success, update_hapd_config def test_dpp_network_intro_version(dev, apdev): """DPP Network Introduction and protocol version""" @@ -47,3 +52,118 @@ def test_dpp_network_intro_version_missing_req(dev, apdev): raise Exception("Unexpected network introduction result on STA: " + ev) finally: dev[0].set("dpp_config_processing", "0", allow_fail=True) + +def run_dpp_tcp_pkex(dev0, dev1, cap_lo): + check_dpp_capab(dev0, min_ver=3) + check_dpp_capab(dev1, min_ver=3) + + wt = WlantestCapture('lo', cap_lo) + time.sleep(1) + + # Controller + conf_id = dev1.dpp_configurator_add() + dev1.set("dpp_configurator_params", + " conf=sta-dpp configurator=%d" % conf_id) + + req = "DPP_CONTROLLER_START" + own = None + if "OK" not in dev1.request(req): + raise Exception("Failed to start Controller") + + code = "secret" + + id1 = dev1.dpp_bootstrap_gen(type="pkex") + cmd = "own=%d" % id1 + cmd += " code=%s" % code + res = dev1.request("DPP_PKEX_ADD " + cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (responder)") + + dev0.dpp_pkex_init(identifier=None, code=code, role="enrollee", + tcp_addr="127.0.0.1") + + wait_auth_success(dev1, dev0, configurator=dev1, enrollee=dev0, + allow_enrollee_failure=True, + allow_configurator_failure=True) + time.sleep(0.5) + wt.close() + +def test_dpp_tcp_pkex(dev, apdev, params): + """DPP/PKEXv2 over TCP""" + prefix = "dpp_tcp_pkex" + cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap") + try: + run_dpp_tcp_pkex(dev[0], dev[1], cap_lo) + finally: + dev[1].request("DPP_CONTROLLER_STOP") + +def test_dpp_controller_relay_pkex(dev, apdev, params): + """DPP Controller/Relay with PKEX""" + try: + run_dpp_controller_relay_pkex(dev, apdev, params) + finally: + dev[0].set("dpp_config_processing", "0", allow_fail=True) + dev[1].request("DPP_CONTROLLER_STOP") + +def run_dpp_controller_relay_pkex(dev, apdev, params): + check_dpp_capab(dev[0], min_ver=2) + check_dpp_capab(dev[1], min_ver=2) + prefix = "dpp_controller_relay_pkex" + cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap") + + wt = WlantestCapture('lo', cap_lo) + + # Controller + conf_id = dev[1].dpp_configurator_add() + dev[1].set("dpp_configurator_params", + "conf=sta-dpp configurator=%d" % conf_id) + id_c = dev[1].dpp_bootstrap_gen() + res = dev[1].request("DPP_BOOTSTRAP_INFO %d" % id_c) + pkhash = None + for line in res.splitlines(): + name, value = line.split('=') + if name == "pkhash": + pkhash = value + break + if not pkhash: + raise Exception("Could not fetch public key hash from Controller") + if "OK" not in dev[1].request("DPP_CONTROLLER_START"): + raise Exception("Failed to start Controller") + + # Relay + params = {"ssid": "unconfigured", + "channel": "6", + "dpp_controller": "ipaddr=127.0.0.1 pkhash=" + pkhash} + relay = hostapd.add_ap(apdev[1], params) + check_dpp_capab(relay) + + # Enroll Relay to the network + id_h = relay.dpp_bootstrap_gen(chan="81/6", mac=True) + uri_r = relay.request("DPP_BOOTSTRAP_GET_URI %d" % id_h) + dev[1].dpp_auth_init(uri=uri_r, conf="ap-dpp", configurator=conf_id) + wait_auth_success(relay, dev[1], configurator=dev[1], enrollee=relay) + update_hapd_config(relay) + + code = "secret" + id1 = dev[1].dpp_bootstrap_gen(type="pkex") + cmd = "own=%d" % id1 + cmd += " code=%s" % code + res = dev[1].request("DPP_PKEX_ADD " + cmd) + if "FAIL" in res: + raise Exception("Failed to set PKEX data (Controller)") + + # Initiate PKEX from Enrollee + dev[0].set("dpp_config_processing", "2") + dev[0].dpp_pkex_init(identifier=None, code=code, role="enrollee") + wait_auth_success(dev[1], dev[0], configurator=dev[1], enrollee=dev[0], + allow_enrollee_failure=True, + allow_configurator_failure=True) + ev = dev[0].wait_event(["DPP-NETWORK-ID"], timeout=1) + if ev is None: + raise Exception("DPP network id not reported") + network = int(ev.split(' ')[1]) + dev[0].wait_connected() + dev[0].dump_monitor() + + time.sleep(0.5) + wt.close() diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 1f4a6e048..4a94d9122 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -1577,7 +1577,8 @@ class WpaSupplicant: return int(peer) def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None, - extra=None, use_id=None, allow_fail=False, ver=None): + extra=None, use_id=None, allow_fail=False, ver=None, + tcp_addr=None, tcp_port=None): if use_id is None: id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve) else: @@ -1590,6 +1591,10 @@ class WpaSupplicant: cmd += "ver=" + str(ver) + " " if role: cmd += "role=%s " % role + if tcp_addr: + cmd += "tcp_addr=" + tcp_addr + " " + if tcp_port: + cmd += "tcp_port=" + tcp_port + " " if extra: cmd += extra + " " cmd += "code=%s" % code