tests: Mesh authentication failure events

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2015-02-08 13:08:32 +02:00
parent bf51f4f82b
commit f77d6d4bd8
3 changed files with 98 additions and 11 deletions

View file

@ -13,7 +13,7 @@ logger = logging.getLogger()
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True): ifname1=None, ifname2=None, config=True, timeout=5):
addr1 = dev1.own_addr() addr1 = dev1.own_addr()
if not dev1group and isinstance(dev1, WpaSupplicant): if not dev1group and isinstance(dev1, WpaSupplicant):
addr1 = dev1.get_driver_status_field('addr') addr1 = dev1.get_driver_status_field('addr')
@ -53,9 +53,9 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
else: else:
dev1.request(cmd) dev1.request(cmd)
if dev2group: if dev2group:
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5) ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else: else:
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5) ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None: if ev is None:
raise Exception("dev1->dev2 unicast data delivery failed") raise Exception("dev1->dev2 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
@ -67,9 +67,9 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
else: else:
dev1.request(cmd) dev1.request(cmd)
if dev2group: if dev2group:
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5) ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else: else:
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5) ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None: if ev is None:
raise Exception("dev1->dev2 broadcast data delivery failed") raise Exception("dev1->dev2 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
@ -81,9 +81,9 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
else: else:
dev2.request(cmd) dev2.request(cmd)
if dev1group: if dev1group:
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5) ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else: else:
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5) ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None: if ev is None:
raise Exception("dev2->dev1 unicast data delivery failed") raise Exception("dev2->dev1 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
@ -95,9 +95,9 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
else: else:
dev2.request(cmd) dev2.request(cmd)
if dev1group: if dev1group:
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5) ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else: else:
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5) ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None: if ev is None:
raise Exception("dev2->dev1 broadcast data delivery failed") raise Exception("dev2->dev1 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev: if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
@ -115,7 +115,7 @@ def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
dev1group=False, dev2group=False, dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True): ifname1=None, ifname2=None, config=True, timeout=5):
if dscp: if dscp:
tos = dscp << 2 tos = dscp << 2
if not tos: if not tos:
@ -126,7 +126,8 @@ def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
for i in range(0, max_tries): for i in range(0, max_tries):
try: try:
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group, run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
ifname1, ifname2, config=config) ifname1, ifname2, config=config,
timeout=timeout)
success = True success = True
break break
except Exception, e: except Exception, e:

View file

@ -507,3 +507,87 @@ def _test_wpas_mesh_open_5ghz(dev, apdev):
# Test connectivity 0->1 and 1->0 # Test connectivity 0->1 and 1->0
hwsim_utils.test_connectivity(dev[0], dev[1]) hwsim_utils.test_connectivity(dev[0], dev[1])
def test_wpas_mesh_password_mismatch(dev, apdev):
"""Mesh network and one device with mismatching password"""
check_mesh_support(dev[0], secure=True)
dev[0].request("SET sae_groups ")
id = add_mesh_secure_net(dev[0])
dev[0].mesh_group_add(id)
dev[1].request("SET sae_groups ")
id = add_mesh_secure_net(dev[1])
dev[1].mesh_group_add(id)
dev[2].request("SET sae_groups ")
id = add_mesh_secure_net(dev[2])
dev[2].set_network_quoted(id, "psk", "wrong password")
dev[2].mesh_group_add(id)
# The two peers with matching password need to be able to connect
check_mesh_group_added(dev[0])
check_mesh_group_added(dev[1])
check_mesh_peer_connected(dev[0])
check_mesh_peer_connected(dev[1])
ev = dev[2].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
if ev is None:
raise Exception("dev2 did not report auth failure (1)")
ev = dev[2].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
if ev is None:
raise Exception("dev2 did not report auth failure (2)")
ev = dev[0].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=1)
if ev is None:
raise Exception("dev0 did not report auth failure")
if "addr=" + dev[2].own_addr() not in ev:
raise Exception("Unexpected peer address in dev0 event: " + ev)
ev = dev[1].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=1)
if ev is None:
raise Exception("dev1 did not report auth failure")
if "addr=" + dev[2].own_addr() not in ev:
raise Exception("Unexpected peer address in dev1 event: " + ev)
hwsim_utils.test_connectivity(dev[0], dev[1])
for i in range(2):
try:
hwsim_utils.test_connectivity(dev[i], dev[2], timeout=1)
raise Exception("Data connectivity test passed unexpectedly")
except Exception, e:
if "data delivery failed" not in str(e):
raise
def test_wpas_mesh_password_mismatch_retry(dev, apdev, params):
"""Mesh password mismatch and retry [long]"""
if not params['long']:
raise HwsimSkip("Skip test case with long duration due to --long not specified")
check_mesh_support(dev[0], secure=True)
dev[0].request("SET sae_groups ")
id = add_mesh_secure_net(dev[0])
dev[0].mesh_group_add(id)
dev[1].request("SET sae_groups ")
id = add_mesh_secure_net(dev[1])
dev[1].set_network_quoted(id, "psk", "wrong password")
dev[1].mesh_group_add(id)
# Check for mesh joined
check_mesh_group_added(dev[0])
check_mesh_group_added(dev[1])
for i in range(4):
ev = dev[0].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
if ev is None:
raise Exception("dev0 did not report auth failure (%d)" % i)
ev = dev[1].wait_event(["MESH-SAE-AUTH-FAILURE"], timeout=20)
if ev is None:
raise Exception("dev1 did not report auth failure (%d)" % i)
ev = dev[0].wait_event(["MESH-SAE-AUTH-BLOCKED"], timeout=10)
if ev is None:
raise Exception("dev0 did not report auth blocked")
ev = dev[1].wait_event(["MESH-SAE-AUTH-BLOCKED"], timeout=10)
if ev is None:
raise Exception("dev1 did not report auth blocked")

View file

@ -313,6 +313,8 @@ def main():
# duration test case on a single VM while all other VMs have already # duration test case on a single VM while all other VMs have already
# completed their work. # completed their work.
long = [ "ap_roam_open", long = [ "ap_roam_open",
"wpas_mesh_password_mismatch_retry",
"wpas_mesh_password_mismatch",
"hostapd_oom_wpa2_psk_connect", "hostapd_oom_wpa2_psk_connect",
"ap_hs20_fetch_osu_stop", "ap_hs20_fetch_osu_stop",
"ap_roam_wpa2_psk", "ap_roam_wpa2_psk",