tests: Add support for wlantest for remote hwsim tests

Use a monitor interface given in the command line that is not also a
station or an AP as a monitor running wlantest on the channel used by
the test. This makes all the tests that use wlantest available for
execution on real hardware on remote hosts.

Signed-off-by: Jonathan Afek <jonathanx.afek@intel.com>
This commit is contained in:
Jonathan Afek 2016-05-19 16:06:49 +03:00 committed by Jouni Malinen
parent 0335ff7f69
commit 8efc83d4e7
12 changed files with 217 additions and 91 deletions

View file

@ -503,6 +503,9 @@ def main():
del hapd
hapd = None
# Use None here since this instance of Wlantest() will never be
# used for remote host hwsim tests on real hardware.
Wlantest.setup(None)
wt = Wlantest()
rename_log(args.logdir, 'hwsim0.pcapng', name, wt)
rename_log(args.logdir, 'hwsim0', name, wt)

View file

@ -28,10 +28,6 @@ def check_cipher(dev, ap, cipher):
hwsim_utils.test_connectivity(dev, hapd)
def check_group_mgmt_cipher(dev, ap, cipher):
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
if cipher not in dev.get_capability("group_mgmt"):
raise HwsimSkip("Cipher %s not supported" % cipher)
params = { "ssid": "test-wpa2-psk-pmf",
@ -42,6 +38,12 @@ def check_group_mgmt_cipher(dev, ap, cipher):
"rsn_pairwise": "CCMP",
"group_mgmt_cipher": cipher }
hapd = hostapd.add_ap(ap, params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev.connect("test-wpa2-psk-pmf", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256",
pairwise="CCMP", group="CCMP", scan_freq="2412")

View file

@ -298,7 +298,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev):
ssid = "test-hs20-ap1"
params['ssid'] = ssid
params['hessid'] = bssid
hostapd.add_ap(apdev[0], params)
hapd0 = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params()
@ -312,6 +312,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev):
dev[0].hs20_enable()
Wlantest.setup(hapd0)
wt = Wlantest()
wt.flush()

View file

@ -17,13 +17,14 @@ from wpasupplicant import WpaSupplicant
def test_ap_pmf_required(dev, apdev):
"""WPA2-PSK AP with PMF required"""
ssid = "test-pmf-required"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
@ -53,13 +54,14 @@ def test_ap_pmf_required(dev, apdev):
def test_ap_pmf_optional(dev, apdev):
"""WPA2-PSK AP with PMF optional"""
ssid = "test-pmf-optional"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -75,13 +77,14 @@ def test_ap_pmf_optional(dev, apdev):
def test_ap_pmf_optional_2akm(dev, apdev):
"""WPA2-PSK AP with PMF optional (2 AKMs)"""
ssid = "test-pmf-optional-2akm"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -101,11 +104,12 @@ def test_ap_pmf_optional_2akm(dev, apdev):
def test_ap_pmf_negative(dev, apdev):
"""WPA2-PSK AP without PMF (negative test)"""
ssid = "test-pmf-negative"
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -123,13 +127,14 @@ def test_ap_pmf_negative(dev, apdev):
def test_ap_pmf_assoc_comeback(dev, apdev):
"""WPA2-PSK AP with PMF association comeback"""
ssid = "assoc-comeback"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -146,13 +151,14 @@ def test_ap_pmf_assoc_comeback(dev, apdev):
def test_ap_pmf_assoc_comeback2(dev, apdev):
"""WPA2-PSK AP with PMF association comeback (using DROP_SA)"""
ssid = "assoc-comeback"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
if "OK" not in dev[0].request("DROP_SA"):
@ -167,9 +173,6 @@ def test_ap_pmf_sta_sa_query(dev, apdev):
"""WPA2-PSK AP with station using SA Query"""
ssid = "assoc-comeback"
addr = dev[0].own_addr()
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
@ -187,6 +190,11 @@ def test_ap_pmf_sta_sa_query(dev, apdev):
bssid = wpas.own_addr()
wpas.dump_monitor()
Wlantest.setup(wpas)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -260,9 +268,6 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
"""WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames"""
ssid = "deauth-attack"
addr = dev[0].own_addr()
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
@ -279,6 +284,11 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
wpas.connect_network(id)
bssid = wpas.own_addr()
Wlantest.setup(wpas)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@ -348,13 +358,14 @@ def test_ap_pmf_optional_eap(dev, apdev):
def test_ap_pmf_required_sha1(dev, apdev):
"""WPA2-PSK AP with PMF required with SHA1 AKM"""
ssid = "test-pmf-required-sha1"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
@ -373,15 +384,16 @@ def test_ap_pmf_toggle(dev, apdev):
def _test_ap_pmf_toggle(dev, apdev):
ssid = "test-pmf-optional"
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
params["assoc_sa_query_max_timeout"] = "1"
params["assoc_sa_query_retry_timeout"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
bssid = apdev[0]['bssid']
addr = dev[0].own_addr()
dev[0].request("SET reassoc_same_bss_optim 1")

View file

@ -18,6 +18,7 @@ def check_qos_map(ap, hapd, dev, sta, dscp, tid, ap_tid=None):
if not ap_tid:
ap_tid = tid
bssid = ap['bssid']
Wlantest.setup(hapd)
wt = Wlantest()
wt.clear_sta_counters(bssid, sta)
hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False)

View file

@ -56,7 +56,8 @@ def connect_2sta_open(dev, hapd, scan_freq="2412"):
dev[1].connect("test-open", key_mgmt="NONE", scan_freq=scan_freq)
connectivity(dev, hapd)
def wlantest_setup():
def wlantest_setup(hapd):
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
@ -165,7 +166,7 @@ def check_tdls_link(sta0, sta1, connected=True):
def test_ap_tdls_discovery(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS discovery"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("TDLS_DISCOVER " + dev[1].p2p_interface_addr())
time.sleep(0.2)
@ -173,7 +174,7 @@ def test_ap_tdls_discovery(dev, apdev):
def test_ap_wpa2_tdls(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -183,7 +184,7 @@ def test_ap_wpa2_tdls(dev, apdev):
def test_ap_wpa2_tdls_concurrent_init(dev, apdev):
"""Concurrent TDLS setup initiation"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x80")
setup_tdls(dev[1], dev[0], hapd, reverse=True)
@ -191,7 +192,7 @@ def test_ap_wpa2_tdls_concurrent_init(dev, apdev):
def test_ap_wpa2_tdls_concurrent_init2(dev, apdev):
"""Concurrent TDLS setup initiation (reverse)"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x80")
setup_tdls(dev[0], dev[1], hapd)
@ -199,7 +200,7 @@ def test_ap_wpa2_tdls_concurrent_init2(dev, apdev):
def test_ap_wpa2_tdls_decline_resp(dev, apdev):
"""Decline TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x200")
setup_tdls(dev[1], dev[0], hapd, expect_fail=True)
@ -207,7 +208,7 @@ def test_ap_wpa2_tdls_decline_resp(dev, apdev):
def test_ap_wpa2_tdls_long_lifetime(dev, apdev):
"""TDLS with long TPK lifetime"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x40")
setup_tdls(dev[1], dev[0], hapd)
@ -215,7 +216,7 @@ def test_ap_wpa2_tdls_long_lifetime(dev, apdev):
def test_ap_wpa2_tdls_long_frame(dev, apdev):
"""TDLS with long setup/teardown frames"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x1")
dev[1].request("SET tdls_testing 0x1")
@ -226,7 +227,7 @@ def test_ap_wpa2_tdls_long_frame(dev, apdev):
def test_ap_wpa2_tdls_reneg(dev, apdev):
"""Renegotiate TDLS link"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[1], dev[0], hapd)
setup_tdls(dev[0], dev[1], hapd)
@ -234,7 +235,7 @@ def test_ap_wpa2_tdls_reneg(dev, apdev):
def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev):
"""Incorrect TPK lifetime in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x10")
setup_tdls(dev[0], dev[1], hapd, expect_fail=True)
@ -242,7 +243,7 @@ def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev):
def test_ap_wpa2_tdls_diff_rsnie(dev, apdev):
"""TDLS with different RSN IEs"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x2")
setup_tdls(dev[1], dev[0], hapd)
@ -251,7 +252,7 @@ def test_ap_wpa2_tdls_diff_rsnie(dev, apdev):
def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
@ -261,7 +262,7 @@ def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev):
def test_ap_wpa2_tdls_wrong_tpk_m3_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Confirm"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
@ -274,7 +275,7 @@ def test_ap_wpa_tdls(dev, apdev):
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_params(ssid="test-wpa-psk",
passphrase="12345678"))
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -286,7 +287,7 @@ def test_ap_wpa_mixed_tdls(dev, apdev):
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_mixed_params(ssid="test-wpa-mixed-psk",
passphrase="12345678"))
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa_psk_mixed(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -296,7 +297,7 @@ def test_ap_wep_tdls(dev, apdev):
"""WEP AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0],
{ "ssid": "test-wep", "wep_key0": '"hello"' })
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wep(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -305,7 +306,7 @@ def test_ap_wep_tdls(dev, apdev):
def test_ap_open_tdls(dev, apdev):
"""Open AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_open(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -321,7 +322,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev):
params['bridge'] = 'ap-br0'
hapd = hostapd.add_ap(apdev[0], params)
hostapd.add_ap(apdev[1], params)
wlantest_setup()
wlantest_setup(hapd)
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
dev[0].connect(ssid, psk=passphrase, scan_freq="2412",
@ -343,7 +344,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev):
def test_ap_wpa2_tdls_responder_teardown(dev, apdev):
"""TDLS teardown from responder with WPA2-PSK AP"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd, responder=True)
@ -362,7 +363,7 @@ def test_ap_open_tdls_vht(dev, apdev):
"vht_oper_centr_freq_seg0_idx": "0" }
try:
hapd = hostapd.add_ap(apdev[0], params)
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@ -392,7 +393,7 @@ def test_ap_open_tdls_vht80(dev, apdev):
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=80 MHz" not in sig:
@ -436,7 +437,7 @@ def test_ap_open_tdls_vht80plus80(dev, apdev):
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "FREQUENCY=5180" not in sig:
@ -492,7 +493,7 @@ def test_ap_open_tdls_vht160(dev, apdev):
if "5490" in r and "DFS" in r:
raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
raise Exception("AP setup timed out")
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5520")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=160 MHz" not in sig:
@ -539,7 +540,7 @@ def test_tdls_chan_switch(dev, apdev):
def test_ap_tdls_link_status(dev, apdev):
"""Check TDLS link status between two stations"""
hapd = start_ap_wpa2_psk(apdev[0])
wlantest_setup()
wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
check_tdls_link(dev[0], dev[1], connected=False)
setup_tdls(dev[0], dev[1], hapd)

View file

@ -231,7 +231,6 @@ def test_autogo_pbc(dev):
def test_autogo_tdls(dev):
"""P2P autonomous GO and two clients using TDLS"""
wt = Wlantest()
go = dev[0]
logger.info("Start autonomous GO with fixed parameters " + go.ifname)
id = go.add_network()
@ -241,6 +240,8 @@ def test_autogo_tdls(dev):
go.set_network(id, "disabled", "2")
res = go.p2p_start_go(persistent=id, freq="2462")
logger.debug("res: " + str(res))
Wlantest.setup(go, True)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
connect_cli(go, dev[1], social=True, freq=2462)

View file

@ -50,15 +50,17 @@ def test_peerkey_unknown_peer(dev, apdev):
def test_peerkey_pairwise_mismatch(dev, apdev):
"""RSN TKIP+CCMP AP and PeerKey between two STAs using different ciphers"""
skip_with_fips(dev[0])
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
ssid = "test-peerkey"
passphrase = "12345678"
params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
params['peerkey'] = "1"
params['rsn_pairwise'] = "TKIP CCMP"
hostapd.add_ap(apdev[0], params)
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True,
pairwise="CCMP")

View file

@ -185,9 +185,6 @@ def test_wnm_sleep_mode_ap_oom(dev, apdev):
def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
"""WNM Sleep Mode - RSN with PMF"""
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
@ -197,6 +194,11 @@ def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
params["bss_transition"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)

View file

@ -4,7 +4,9 @@
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import re
import os
import posixpath
import time
import subprocess
import logging
@ -13,44 +15,133 @@ import wpaspy
logger = logging.getLogger()
class Wlantest:
remote_host = None
setup_params = None
exe_thread = None
exe_res = []
monitor_mod = None
setup_done = False
@classmethod
def stop_remote_wlantest(cls):
if cls.exe_thread is None:
# Local flow - no need for remote operations
return
cls.remote_host.execute(["killall", "-9", "wlantest"])
cls.remote_host.wait_execute_complete(cls.exe_thread, 5)
cls.exe_thread = None
cls.exe_res = []
@classmethod
def reset_remote_wlantest(cls):
cls.stop_remote_wlantest()
cls.remote_host = None
cls.setup_params = None
cls.exe_thread = None
cls.exe_res = []
cls.monitor_mod = None
cls.setup_done = False
@classmethod
def start_remote_wlantest(cls):
if cls.remote_host is None:
# Local flow - no need for remote operations
return
if cls.exe_thread is not None:
raise Exception("Cannot start wlantest twice")
log_dir = cls.setup_params['log_dir']
ifaces = re.split('; | |, ', cls.remote_host.ifname)
ifname = ifaces[0]
exe = cls.setup_params["wlantest"]
tc_name = cls.setup_params["tc_name"]
base_log_name = tc_name + "_wlantest_" + \
cls.remote_host.name + "_" + ifname
log_file = posixpath.join(log_dir, base_log_name + ".log")
pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
pcap_file, log_file)
cls.remote_host.add_log(log_file)
cls.remote_host.add_log(pcap_file)
cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res)
# Give wlantest a chance to start working
time.sleep(1)
@classmethod
def register_remote_wlantest(cls, host, setup_params, monitor_mod):
if cls.remote_host is not None:
raise Exception("Cannot register remote wlantest twice")
cls.remote_host = host
cls.setup_params = setup_params
cls.monitor_mod = monitor_mod
status, buf = host.execute(["which", setup_params['wlantest']])
if status != 0:
raise Exception(host.name + " - wlantest: " + buf)
status, buf = host.execute(["which", setup_params['wlantest_cli']])
if status != 0:
raise Exception(host.name + " - wlantest_cli: " + buf)
@classmethod
def chan_from_wpa(cls, wpa, is_p2p=False):
if cls.monitor_mod is None:
return
m = cls.monitor_mod
return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
@classmethod
def setup(cls, wpa, is_p2p=False):
cls.chan_from_wpa(wpa, is_p2p)
cls.start_remote_wlantest()
cls.setup_done = True
def __init__(self):
if not self.setup_done:
raise Exception("Cannot create Wlantest instance before setup()")
if os.path.isfile('../../wlantest/wlantest_cli'):
self.wlantest_cli = '../../wlantest/wlantest_cli'
else:
self.wlantest_cli = 'wlantest_cli'
def cli_cmd(self, params):
if self.remote_host is not None:
exe = self.setup_params["wlantest_cli"]
ret = self.remote_host.execute([exe] + params)
if ret[0] != 0:
raise Exception("wlantest_cli failed")
return ret[1]
else:
return subprocess.check_output([self.wlantest_cli] + params)
def flush(self):
res = subprocess.check_output([self.wlantest_cli, "flush"])
res = self.cli_cmd(["flush"])
if "FAIL" in res:
raise Exception("wlantest_cli flush failed")
def relog(self):
res = subprocess.check_output([self.wlantest_cli, "relog"])
res = self.cli_cmd(["relog"])
if "FAIL" in res:
raise Exception("wlantest_cli relog failed")
def add_passphrase(self, passphrase):
res = subprocess.check_output([self.wlantest_cli, "add_passphrase",
passphrase])
res = self.cli_cmd(["add_passphrase", passphrase])
if "FAIL" in res:
raise Exception("wlantest_cli add_passphrase failed")
def add_wepkey(self, key):
res = subprocess.check_output([self.wlantest_cli, "add_wepkey", key])
res = self.cli_cmd(["add_wepkey", key])
if "FAIL" in res:
raise Exception("wlantest_cli add_key failed")
def info_bss(self, field, bssid):
res = subprocess.check_output([self.wlantest_cli, "info_bss",
field, bssid])
res = self.cli_cmd(["info_bss", field, bssid])
if "FAIL" in res:
raise Exception("Could not get BSS info from wlantest for " + bssid)
return res
def get_bss_counter(self, field, bssid):
try:
res = subprocess.check_output([self.wlantest_cli, "get_bss_counter",
field, bssid]);
res = self.cli_cmd(["get_bss_counter", field, bssid])
except Exception, e:
return 0
if "FAIL" in res:
@ -58,36 +149,30 @@ class Wlantest:
return int(res)
def clear_bss_counters(self, bssid):
subprocess.call([self.wlantest_cli, "clear_bss_counters", bssid],
stdout=open('/dev/null', 'w'));
self.cli_cmd(["clear_bss_counters", bssid])
def info_sta(self, field, bssid, addr):
res = subprocess.check_output([self.wlantest_cli, "info_sta",
field, bssid, addr])
res = self.cli_cmd(["info_sta", field, bssid, addr])
if "FAIL" in res:
raise Exception("Could not get STA info from wlantest for " + addr)
return res
def get_sta_counter(self, field, bssid, addr):
res = subprocess.check_output([self.wlantest_cli, "get_sta_counter",
field, bssid, addr]);
res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def clear_sta_counters(self, bssid, addr):
res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
bssid, addr]);
res = self.cli_cmd(["clear_sta_counters", bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
def tdls_clear(self, bssid, addr1, addr2):
res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
bssid, addr1, addr2]);
self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
def get_tdls_counter(self, field, bssid, addr1, addr2):
res = subprocess.check_output([self.wlantest_cli, "get_tdls_counter",
field, bssid, addr1, addr2]);
res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
@ -139,15 +224,13 @@ class Wlantest:
raise Exception("Unexpected STA key_mgmt")
def get_tx_tid(self, bssid, addr, tid):
res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
bssid, addr, str(tid)]);
res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def get_rx_tid(self, bssid, addr, tid):
res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
bssid, addr, str(tid)]);
res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)

View file

@ -20,6 +20,8 @@ setup_params = { "setup_hw" : "./tests/setup_hw.sh",
"hostapd" : "./tests/hostapd",
"wpa_supplicant" : "./tests/wpa_supplicant",
"iperf" : "iperf",
"wlantest" : "./tests/wlantest",
"wlantest_cli" : "./tests/wlantest_cli",
"country" : "US",
"log_dir" : "/tmp/",
"ipv4_test_net" : "192.168.12.0",

View file

@ -11,6 +11,7 @@ import config
import rutils
import monitor
import traceback
import wlantest
import logging
logger = logging.getLogger()
@ -45,6 +46,15 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
monitor.add(dut_host, monitors)
monitor.run(dut_host, setup_params)
monitor_hosts = monitor.create(devices, setup_params, refs, duts,
monitors)
mon = None
if len(monitor_hosts) > 0:
mon = monitor_hosts[0]
wlantest.Wlantest.reset_remote_wlantest()
wlantest.Wlantest.register_remote_wlantest(mon, setup_params,
monitor)
# run hostapd/wpa_supplicant
for ref_host in ref_hosts:
rutils.run_wpasupplicant(ref_host, setup_params)
@ -83,6 +93,9 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
if mon is not None:
wlantest.Wlantest.reset_remote_wlantest()
mon.get_logs(local_log_dir)
return ""
except:
@ -105,4 +118,7 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
if mon is not None:
wlantest.Wlantest.reset_remote_wlantest()
mon.get_logs(local_log_dir)
raise