diff --git a/tests/hwsim/test_p2p_grpform.py b/tests/hwsim/test_p2p_grpform.py index 1b276e40f..19f5e06b5 100644 --- a/tests/hwsim/test_p2p_grpform.py +++ b/tests/hwsim/test_p2p_grpform.py @@ -20,7 +20,15 @@ def check_grpform_results(i_res, r_res): if i_res['ssid'] != r_res['ssid']: raise Exception("SSID mismatch") if i_res['freq'] != r_res['freq']: - raise Exception("SSID mismatch") + raise Exception("freq mismatch") + if 'go_neg_freq' in r_res and i_res['go_neg_freq'] != r_res['go_neg_freq']: + raise Exception("go_neg_freq mismatch") + if i_res['freq'] != i_res['go_neg_freq']: + raise Exception("freq/go_neg_freq mismatch") + if i_res['role'] != i_res['go_neg_role']: + raise Exception("role/go_neg_role mismatch") + if 'go_neg_role' in r_res and r_res['role'] != r_res['go_neg_role']: + raise Exception("role/go_neg_role mismatch") if i_res['go_dev_addr'] != r_res['go_dev_addr']: raise Exception("GO Device Address mismatch") @@ -55,13 +63,13 @@ def go_neg_pin(i_dev, r_dev, i_intent=None, r_intent=None, i_method='enter', r_m hwsim_utils.test_connectivity_p2p(r_dev, i_dev) i_dev.dump_monitor() -def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_failure=False, i_go_neg_status=None, i_method='enter', r_method='display'): +def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_failure=False, i_go_neg_status=None, i_method='enter', r_method='display', test_data=True, i_freq=None, r_freq=None): r_dev.p2p_listen() i_dev.p2p_listen() pin = r_dev.wps_read_pin() logger.info("Start GO negotiation " + i_dev.ifname + " -> " + r_dev.ifname) - r_dev.p2p_go_neg_auth(i_dev.p2p_dev_addr(), pin, r_method, go_intent=r_intent) - i_res = i_dev.p2p_go_neg_init(r_dev.p2p_dev_addr(), pin, i_method, timeout=20, go_intent=i_intent, expect_failure=expect_failure) + r_dev.p2p_go_neg_auth(i_dev.p2p_dev_addr(), pin, r_method, go_intent=r_intent, freq=r_freq) + i_res = i_dev.p2p_go_neg_init(r_dev.p2p_dev_addr(), pin, i_method, timeout=20, go_intent=i_intent, expect_failure=expect_failure, freq=i_freq) r_res = r_dev.p2p_go_neg_auth_result(expect_failure=expect_failure) logger.debug("i_res: " + str(i_res)) logger.debug("r_res: " + str(r_res)) @@ -75,7 +83,8 @@ def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_fai if expect_failure: return logger.info("Group formed") - hwsim_utils.test_connectivity_p2p(r_dev, i_dev) + if test_data: + hwsim_utils.test_connectivity_p2p(r_dev, i_dev) return [i_res, r_res] def go_neg_init_pbc(i_dev, r_dev, i_intent, res): @@ -109,32 +118,29 @@ def go_neg_pbc(i_dev, r_dev, i_intent=None, r_intent=None): i_dev.dump_monitor() return [i_res, r_res] -def test_grpform(dev): - """P2P group formation using PIN and authorized connection (init -> GO)""" - go_neg_pin_authorized(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=0) - dev[0].remove_group() +def remove_group(dev1, dev2): + dev1.remove_group() try: - dev[1].remove_group() + dev2.remove_group() except: pass +def test_grpform(dev): + """P2P group formation using PIN and authorized connection (init -> GO)""" + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=15, + r_dev=dev[1], r_intent=0) + check_grpform_results(i_res, r_res) + remove_group(dev[0], dev[1]) + def test_grpform2(dev): """P2P group formation using PIN and authorized connection (resp -> GO)""" go_neg_pin_authorized(i_dev=dev[0], i_intent=0, r_dev=dev[1], r_intent=15) - dev[0].remove_group() - try: - dev[1].remove_group() - except: - pass + remove_group(dev[0], dev[1]) def test_grpform3(dev): """P2P group formation using PIN and re-init GO Negotiation""" go_neg_pin(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=0) - dev[0].remove_group() - try: - dev[1].remove_group() - except: - pass + remove_group(dev[0], dev[1]) def test_grpform_pbc(dev): """P2P group formation using PBC and re-init GO Negotiation""" @@ -142,11 +148,7 @@ def test_grpform_pbc(dev): check_grpform_results(i_res, r_res) if i_res['role'] != 'GO' or r_res['role'] != 'client': raise Exception("Unexpected device roles") - dev[0].remove_group() - try: - dev[1].remove_group() - except: - pass + remove_group(dev[0], dev[1]) def test_both_go_intent_15(dev): """P2P GO Negotiation with both devices using GO intent 15""" @@ -197,3 +199,72 @@ def test_grpform_per_sta_psk_wps(dev): dev[0].remove_group() dev[2].request("DISCONNECT") dev[1].wait_go_ending_session() + +def test_grpform_force_chan_go(dev): + """P2P group formation forced channel selection by GO""" + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=15, + i_freq=2432, + r_dev=dev[1], r_intent=0, + test_data=False) + check_grpform_results(i_res, r_res) + if i_res['freq'] != "2432": + raise Exception("Unexpected channel - did not follow GO's forced channel") + remove_group(dev[0], dev[1]) + +def test_grpform_force_chan_cli(dev): + """P2P group formation forced channel selection by client""" + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=0, + i_freq=2417, + r_dev=dev[1], r_intent=15, + test_data=False) + check_grpform_results(i_res, r_res) + if i_res['freq'] != "2417": + raise Exception("Unexpected channel - did not follow GO's forced channel") + remove_group(dev[0], dev[1]) + +def test_grpform_force_chan_conflict(dev): + """P2P group formation fails due to forced channel mismatch""" + go_neg_pin_authorized(i_dev=dev[0], i_intent=0, i_freq=2422, + r_dev=dev[1], r_intent=15, r_freq=2427, + expect_failure=True, i_go_neg_status=7) + +def test_grpform_pref_chan_go(dev): + """P2P group formation preferred channel selection by GO""" + dev[0].request("SET p2p_pref_chan 81:7") + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=15, + r_dev=dev[1], r_intent=0, + test_data=False) + check_grpform_results(i_res, r_res) + if i_res['freq'] != "2442": + raise Exception("Unexpected channel - did not follow GO's p2p_pref_chan") + remove_group(dev[0], dev[1]) + +def test_grpform_pref_chan_go_overridden(dev): + """P2P group formation preferred channel selection by GO overridden by client""" + dev[1].request("SET p2p_pref_chan 81:7") + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=0, + i_freq=2422, + r_dev=dev[1], r_intent=15, + test_data=False) + check_grpform_results(i_res, r_res) + if i_res['freq'] != "2422": + raise Exception("Unexpected channel - did not follow client's forced channel") + remove_group(dev[0], dev[1]) + +def test_grpform_no_go_freq_forcing_chan(dev): + """P2P group formation with no-GO freq forcing channel""" + dev[1].request("SET p2p_no_go_freq 100-200,300,4000-6000") + [i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=0, + r_dev=dev[1], r_intent=15, + test_data=False) + check_grpform_results(i_res, r_res) + if int(i_res['freq']) > 4000: + raise Exception("Unexpected channel - did not follow no-GO freq") + remove_group(dev[0], dev[1]) + +def test_grpform_no_go_freq_conflict(dev): + """P2P group formation fails due to no-GO range forced by client""" + dev[1].request("SET p2p_no_go_freq 2000-3000") + go_neg_pin_authorized(i_dev=dev[0], i_intent=0, i_freq=2422, + r_dev=dev[1], r_intent=15, + expect_failure=True, i_go_neg_status=7) diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 753a487db..a1de52ac2 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -57,6 +57,9 @@ class WpaSupplicant: logger.info("FLUSH to " + self.ifname + " failed: " + res) self.request("SET ignore_old_scan_res 0") self.request("SET external_sim 0") + self.request("SET p2p_add_cli_chan 0") + self.request("SET p2p_no_go_freq ") + self.request("SET p2p_pref_chan ") self.request("P2P_SET per_sta_psk 0") self.request("P2P_SET disabled 0") self.request("P2P_SERVICE_FLUSH") @@ -253,7 +256,7 @@ class WpaSupplicant: vals[name] = value return vals - def group_form_result(self, ev, expect_failure=False): + def group_form_result(self, ev, expect_failure=False, go_neg_res=None): if expect_failure: if "P2P-GROUP-STARTED" in ev: raise Exception("Group formation succeeded when expecting failure") @@ -291,15 +294,26 @@ class WpaSupplicant: if p: res['passphrase'] = p.group(1) res['go_dev_addr'] = s[7] + + if go_neg_res: + exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)' + s = re.split(exp, go_neg_res) + if len(s) < 4: + raise Exception("Could not parse P2P-GO-NEG-SUCCESS") + res['go_neg_role'] = s[2] + res['go_neg_freq'] = s[3] + return res - def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False): + def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) + if freq: + cmd = cmd + ' freq=' + str(freq) if persistent: cmd = cmd + " persistent" if "OK" in self.global_request(cmd): @@ -307,13 +321,22 @@ class WpaSupplicant: raise Exception("P2P_CONNECT (auth) failed") def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): - ev = self.wait_global_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout); + go_neg_res = None + ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", + "P2P-GO-NEG-FAILURE"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") + if "P2P-GO-NEG-SUCCESS" in ev: + go_neg_res = ev + ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout); + if ev is None: + if expect_failure: + return None + raise Exception("Group formation timed out") self.dump_monitor() - return self.group_form_result(ev, expect_failure) + return self.group_form_result(ev, expect_failure, go_neg_res) def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None): if not self.discover_peer(peer): @@ -333,13 +356,22 @@ class WpaSupplicant: if timeout == 0: self.dump_monitor() return None - ev = self.wait_global_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout) + go_neg_res = None + ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", + "P2P-GO-NEG-FAILURE"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") + if "P2P-GO-NEG-SUCCESS" in ev: + go_neg_res = ev + ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) + if ev is None: + if expect_failure: + return None + raise Exception("Group formation timed out") self.dump_monitor() - return self.group_form_result(ev, expect_failure) + return self.group_form_result(ev, expect_failure, go_neg_res) raise Exception("P2P_CONNECT failed") def wait_event(self, events, timeout):