hostapd/tests/hwsim/hwsim_utils.py
Johannes Berg d0007ac9d7 tests: Add test to check disconnect in powersave
The kernel had two bugs (one in hwsim and one more important one in
mac80211) in this area, add a test to make sure we can disconnect
without any kernel issues while in powersave.

Also make sure that the TIM bit gets set and cleared again (by checking
with tshark.)

Signed-off-by: Johannes Berg <johannes.berg@intel.com>
2015-02-21 16:07:52 +02:00

160 lines
5.7 KiB
Python

# hwsim testing utilities
# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import subprocess
import time
import logging
logger = logging.getLogger()
from wpasupplicant import WpaSupplicant
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True, timeout=5):
addr1 = dev1.own_addr()
if not dev1group and isinstance(dev1, WpaSupplicant):
addr1 = dev1.get_driver_status_field('addr')
addr2 = dev2.own_addr()
if not dev2group and isinstance(dev2, WpaSupplicant):
addr2 = dev2.get_driver_status_field('addr')
dev1.dump_monitor()
dev2.dump_monitor()
try:
if config:
cmd = "DATA_TEST_CONFIG 1"
if ifname1:
cmd = cmd + " ifname=" + ifname1
if dev1group:
res = dev1.group_request(cmd)
else:
res = dev1.request(cmd)
if "OK" not in res:
raise Exception("Failed to enable data test functionality")
cmd = "DATA_TEST_CONFIG 1"
if ifname2:
cmd = cmd + " ifname=" + ifname2
if dev2group:
res = dev2.group_request(cmd)
else:
res = dev2.request(cmd)
if "OK" not in res:
raise Exception("Failed to enable data test functionality")
cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
if dev1group:
dev1.group_request(cmd)
else:
dev1.request(cmd)
if dev2group:
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev1->dev2 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
raise Exception("Unexpected dev1->dev2 unicast data result")
cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
if dev1group:
dev1.group_request(cmd)
else:
dev1.request(cmd)
if dev2group:
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev1->dev2 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
raise Exception("Unexpected dev1->dev2 broadcast data result")
cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
if dev2group:
dev2.group_request(cmd)
else:
dev2.request(cmd)
if dev1group:
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev2->dev1 unicast data delivery failed")
if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
raise Exception("Unexpected dev2->dev1 unicast data result")
cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
if dev2group:
dev2.group_request(cmd)
else:
dev2.request(cmd)
if dev1group:
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
else:
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
if ev is None:
raise Exception("dev2->dev1 broadcast data delivery failed")
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
raise Exception("Unexpected dev2->dev1 broadcast data result")
finally:
if config:
if dev1group:
dev1.group_request("DATA_TEST_CONFIG 0")
else:
dev1.request("DATA_TEST_CONFIG 0")
if dev2group:
dev2.group_request("DATA_TEST_CONFIG 0")
else:
dev2.request("DATA_TEST_CONFIG 0")
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
dev1group=False, dev2group=False,
ifname1=None, ifname2=None, config=True, timeout=5):
if dscp:
tos = dscp << 2
if not tos:
tos = 0
success = False
last_err = None
for i in range(0, max_tries):
try:
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
ifname1, ifname2, config=config,
timeout=timeout)
success = True
break
except Exception, e:
last_err = e
if i + 1 < max_tries:
time.sleep(1)
if not success:
raise Exception(last_err)
def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
max_tries=1):
test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
max_tries=max_tries)
def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
test_connectivity(dev1, dev2, dscp, tos)
(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
def set_powersave(dev, val):
phy = dev.get_driver_status_field("phyname")
psf = open('/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy, 'w')
psf.write('%d\n' % val)
psf.close()