tests: Add p2ps_provision() method
Add generic provision method. This method receives a seeker and an advertiser devices, advertisement id, method, and a flag which indicates whether deferred flow is expected. The method returns P2PS-PROV-DONE events and the pin (if keypad or display method is used). This method is needed to simplify the P2PS provision flows in the tests. This method complies to the P2PS specification regarding the expected order of the show and display PIN events. Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski@intel.com> Reviewed-by: Max Stepanov <Max.Stepanov@intel.com> Reviewed-by: Ilan Peer <ilan.peer@intel.com>
This commit is contained in:
parent
1a1b015622
commit
4796021088
1 changed files with 102 additions and 0 deletions
|
@ -8,6 +8,7 @@ import logging
|
|||
logger = logging.getLogger()
|
||||
import time
|
||||
import random
|
||||
import re
|
||||
|
||||
import hwsim_utils
|
||||
from wpasupplicant import WpaSupplicant
|
||||
|
@ -122,6 +123,107 @@ def p2ps_nonexact_seek(i_dev, r_dev, svc_name, srv_info=None, adv_num=None):
|
|||
ev_list.append(''.join([adv_id, ' ', rcvd_svc_name]))
|
||||
return ev_list
|
||||
|
||||
def p2ps_parse_event(ev, *args):
|
||||
ret = ()
|
||||
for arg in args:
|
||||
m = re.search("\s+" + arg + r"=(\S+)", ev)
|
||||
ret += (m.group(1) if m is not None else None,)
|
||||
return ret
|
||||
|
||||
def p2ps_provision(seeker, advertiser, adv_id, auto_accept=True, method="1000"):
|
||||
addr0 = seeker.p2p_dev_addr()
|
||||
addr1 = advertiser.p2p_dev_addr()
|
||||
|
||||
seeker.asp_provision(addr1, adv_id=str(adv_id), adv_mac=addr1, session_id=1,
|
||||
session_mac=addr0, method=method)
|
||||
|
||||
if not auto_accept or method == "100":
|
||||
pin = None
|
||||
ev_pd_start = advertiser.wait_global_event(["P2PS-PROV-START"],
|
||||
timeout=10)
|
||||
if ev_pd_start is None:
|
||||
raise Exception("P2PS-PROV-START timeout on Advertiser side")
|
||||
peer = ev_pd_start.split()[1]
|
||||
advert_id, advert_mac, session, session_mac =\
|
||||
p2ps_parse_event(ev_pd_start, "adv_id", "adv_mac", "session", "mac")
|
||||
|
||||
ev = seeker.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-FAILURE timeout on seeker side")
|
||||
|
||||
if method == "100":
|
||||
ev = advertiser.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"],
|
||||
timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on advertiser side")
|
||||
if addr0 not in ev:
|
||||
raise Exception("Unknown peer " + addr0)
|
||||
ev = seeker.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"],
|
||||
timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on seeker side")
|
||||
if addr1 not in ev:
|
||||
raise Exception("Unknown peer " + addr1)
|
||||
pin = ev.split()[2]
|
||||
elif method == "8":
|
||||
ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"],
|
||||
timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side")
|
||||
if addr0 not in ev:
|
||||
raise Exception("Unknown peer " + addr0)
|
||||
pin = ev.split()[2]
|
||||
|
||||
advertiser.asp_provision(peer, adv_id=advert_id, adv_mac=advert_mac,
|
||||
session_id=int(session, 0),
|
||||
session_mac=session_mac, status=12)
|
||||
|
||||
ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
|
||||
ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev2 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertiser side")
|
||||
|
||||
if method == "8":
|
||||
ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"],
|
||||
timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN failed on seeker side")
|
||||
if addr1 not in ev:
|
||||
raise Exception("Unknown peer " + addr1)
|
||||
|
||||
if pin is not None:
|
||||
return ev1, ev2, pin
|
||||
return ev1, ev2
|
||||
|
||||
# Auto-accept is true and the method is either P2PS or advertiser is DISPLAY
|
||||
ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
|
||||
ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev2 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertiser side")
|
||||
|
||||
if method == "8":
|
||||
ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on seeker side")
|
||||
if addr1 not in ev:
|
||||
raise Exception("Unknown peer " + addr1)
|
||||
ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"],
|
||||
timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side")
|
||||
if addr0 not in ev:
|
||||
raise Exception("Unknown peer " + addr0)
|
||||
pin = ev.split()[2]
|
||||
return ev1, ev2, pin
|
||||
|
||||
return ev1, ev2
|
||||
|
||||
def p2p_connect_p2ps_method(i_dev, r_dev, autoaccept):
|
||||
"""P2PS connect function with p2ps method"""
|
||||
if autoaccept == False:
|
||||
|
|
Loading…
Reference in a new issue