From 4796021088bf7ae40cc46265e7cfb1266e666e67 Mon Sep 17 00:00:00 2001 From: Andrei Otcheretianski Date: Mon, 13 Jul 2015 09:49:11 +0300 Subject: [PATCH] tests: Add p2ps_provision() method Add generic provision method. This method receives a seeker and an advertiser devices, advertisement id, method, and a flag which indicates whether deferred flow is expected. The method returns P2PS-PROV-DONE events and the pin (if keypad or display method is used). This method is needed to simplify the P2PS provision flows in the tests. This method complies to the P2PS specification regarding the expected order of the show and display PIN events. Signed-off-by: Andrei Otcheretianski Reviewed-by: Max Stepanov Reviewed-by: Ilan Peer --- tests/hwsim/test_p2ps.py | 102 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/tests/hwsim/test_p2ps.py b/tests/hwsim/test_p2ps.py index dbe924e22..0fa182cd5 100644 --- a/tests/hwsim/test_p2ps.py +++ b/tests/hwsim/test_p2ps.py @@ -8,6 +8,7 @@ import logging logger = logging.getLogger() import time import random +import re import hwsim_utils from wpasupplicant import WpaSupplicant @@ -122,6 +123,107 @@ def p2ps_nonexact_seek(i_dev, r_dev, svc_name, srv_info=None, adv_num=None): ev_list.append(''.join([adv_id, ' ', rcvd_svc_name])) return ev_list +def p2ps_parse_event(ev, *args): + ret = () + for arg in args: + m = re.search("\s+" + arg + r"=(\S+)", ev) + ret += (m.group(1) if m is not None else None,) + return ret + +def p2ps_provision(seeker, advertiser, adv_id, auto_accept=True, method="1000"): + addr0 = seeker.p2p_dev_addr() + addr1 = advertiser.p2p_dev_addr() + + seeker.asp_provision(addr1, adv_id=str(adv_id), adv_mac=addr1, session_id=1, + session_mac=addr0, method=method) + + if not auto_accept or method == "100": + pin = None + ev_pd_start = advertiser.wait_global_event(["P2PS-PROV-START"], + timeout=10) + if ev_pd_start is None: + raise Exception("P2PS-PROV-START timeout on Advertiser side") + peer = ev_pd_start.split()[1] + advert_id, advert_mac, session, session_mac =\ + p2ps_parse_event(ev_pd_start, "adv_id", "adv_mac", "session", "mac") + + ev = seeker.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-FAILURE timeout on seeker side") + + if method == "100": + ev = advertiser.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + ev = seeker.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + pin = ev.split()[2] + elif method == "8": + ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + pin = ev.split()[2] + + advertiser.asp_provision(peer, adv_id=advert_id, adv_mac=advert_mac, + session_id=int(session, 0), + session_mac=session_mac, status=12) + + ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev1 is None: + raise Exception("P2PS-PROV-DONE timeout on seeker side") + + ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev2 is None: + raise Exception("P2PS-PROV-DONE timeout on advertiser side") + + if method == "8": + ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN failed on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + + if pin is not None: + return ev1, ev2, pin + return ev1, ev2 + + # Auto-accept is true and the method is either P2PS or advertiser is DISPLAY + ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev1 is None: + raise Exception("P2PS-PROV-DONE timeout on seeker side") + + ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10) + if ev2 is None: + raise Exception("P2PS-PROV-DONE timeout on advertiser side") + + if method == "8": + ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on seeker side") + if addr1 not in ev: + raise Exception("Unknown peer " + addr1) + ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], + timeout=10) + if ev is None: + raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side") + if addr0 not in ev: + raise Exception("Unknown peer " + addr0) + pin = ev.split()[2] + return ev1, ev2, pin + + return ev1, ev2 + def p2p_connect_p2ps_method(i_dev, r_dev, autoaccept): """P2PS connect function with p2ps method""" if autoaccept == False: