diff --git a/tests/hwsim/test_p2ps.py b/tests/hwsim/test_p2ps.py index dbe924e22..0fa182cd5 100644 --- a/tests/hwsim/test_p2ps.py +++ b/tests/hwsim/test_p2ps.py @@ -8,6 +8,7 @@ import logging logger = logging.getLogger() import time import random +import re import hwsim_utils from wpasupplicant import WpaSupplicant @@ -122,6 +123,107 @@ def p2ps_nonexact_seek(i_dev, r_dev, svc_name, srv_info=None, adv_num=None): ev_list.append(''.join([adv_id, ' ', rcvd_svc_name])) return ev_list +def p2ps_parse_event(ev, *args): + ret = () + for arg in args: + m = re.search("\s+" + arg + r"=(\S+)", ev) + ret += (m.group(1) if m is not None else None,) + return ret + +def p2ps_provision(seeker, advertiser, adv_id, auto_accept=True, method="1000"): + addr0 = seeker.p2p_dev_addr() + addr1 = advertiser.p2p_dev_addr() + + seeker.asp_provision(addr1, adv_id=str(adv_id), adv_mac=addr1, session_id=1, + session_mac=addr0, method=method) + + if not auto_accept or method == "100": + pin = None + ev_pd_start = advertiser.wait_global_event(["P2PS-PROV-START"], + timeout=10) + if ev_pd_start is None: + raise Exception("P2PS-PROV-START timeout on Advertiser side") + peer = ev_pd_start.split()[1] + advert_id, advert_mac, session, session_mac =\ + p2ps_parse_event(ev_pd_start, "adv_id", "adv_mac", "session", "mac") + + ev = seeker.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-FAILURE timeout on seeker side") + + if method == "100": + ev = advertiser.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + ev = seeker.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + pin = ev.split()[2] + elif method == "8": + ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + pin = ev.split()[2] + + advertiser.asp_provision(peer, adv_id=advert_id, adv_mac=advert_mac, + session_id=int(session, 0), + session_mac=session_mac, status=12) + + ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev1 is None: + raise Exception("P2PS-PROV-DONE timeout on seeker side") + + ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev2 is None: + raise Exception("P2PS-PROV-DONE timeout on advertiser side") + + if method == "8": + ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN failed on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + + if pin is not None: + return ev1, ev2, pin + return ev1, ev2 + + # Auto-accept is true and the method is either P2PS or advertiser is DISPLAY + ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev1 is None: + raise Exception("P2PS-PROV-DONE timeout on seeker side") + + ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev2 is None: + raise Exception("P2PS-PROV-DONE timeout on advertiser side") + + if method == "8": + ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + pin = ev.split()[2] + return ev1, ev2, pin + + return ev1, ev2 + def p2p_connect_p2ps_method(i_dev, r_dev, autoaccept): """P2PS connect function with p2ps method""" if autoaccept == False: