tests: Additional GAS server coverage

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
This commit is contained in:
Jouni Malinen 2017-02-07 15:46:20 +02:00 committed by Jouni Malinen
parent 65ab7eb1fa
commit 008aa15abc

View file

@ -1386,6 +1386,55 @@ def _test_gas_anqp_address3_ap_non_compliant(dev, apdev, params):
if res[1] != bssid: if res[1] != bssid:
raise Exception("GAS response used unexpected Address3 field value: " + res[1]) raise Exception("GAS response used unexpected Address3 field value: " + res[1])
def test_gas_anqp_address3_pmf(dev, apdev):
"""GAS/ANQP query using IEEE 802.11 compliant Address 3 value with PMF"""
try:
_test_gas_anqp_address3_pmf(dev, apdev)
finally:
dev[0].request("SET gas_address3 0")
def _test_gas_anqp_address3_pmf(dev, apdev):
hapd = start_ap(apdev[0])
bssid = apdev[0]['bssid']
hapd.set("gas_comeback_delay", "2")
hapd.set("gas_address3", "1")
if "OK" not in dev[0].request("SET gas_address3 1"):
raise Exception("Failed to set gas_address3")
dev[0].scan_for_bss(bssid, freq="2412")
dev[0].connect("test-gas", key_mgmt="WPA-EAP", eap="TTLS",
identity="DOMAIN\mschapv2 user", anonymous_identity="ttls",
password="password", phase2="auth=MSCHAPV2",
ca_cert="auth_serv/ca.pem", scan_freq="2412",
ieee80211w="2")
if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
raise Exception("ANQP_GET command failed")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
if ev is None:
raise Exception("GAS query start timed out")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("GAS query timed out")
ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
if ev is None or "Venue Name" not in ev:
raise Exception("Did not receive Venue Name")
ev = dev[0].wait_event(["ANQP-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("ANQP-QUERY-DONE event not seen")
if "result=SUCCESS" not in ev:
raise Exception("Unexpected result: " + ev)
req = dev[0].request("GAS_REQUEST " + bssid + " 42 000102000101")
if "FAIL" in req:
raise Exception("GAS query request rejected")
expect_gas_result(dev[0], "FAILURE", "59")
def test_gas_prot_vs_not_prot(dev, apdev, params): def test_gas_prot_vs_not_prot(dev, apdev, params):
"""GAS/ANQP query protected vs. not protected""" """GAS/ANQP query protected vs. not protected"""
hapd = start_ap(apdev[0]) hapd = start_ap(apdev[0])
@ -1569,3 +1618,222 @@ def test_gas_anqp_capab_list(dev, apdev):
continue continue
if i not in ids: if i not in ids:
raise Exception("Unexpected Capability List ANQP-element value (missing %d): %s" % (i, bss['anqp_capability_list'])) raise Exception("Unexpected Capability List ANQP-element value (missing %d): %s" % (i, bss['anqp_capability_list']))
def test_gas_server_oom(dev, apdev):
"""GAS server OOM"""
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
params['gas_comeback_delay'] = "5"
hapd = hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
tests = [ "ap_sta_add;gas_dialog_create",
"=gas_dialog_create",
"wpabuf_alloc_copy;gas_serv_rx_gas_comeback_req" ]
for t in tests:
with alloc_fail(hapd, 1, t):
if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
raise Exception("ANQP_GET command failed")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
if ev is None:
raise Exception("No GAS-QUERY-DONE seen")
dev[0].dump_monitor()
hapd.set("gas_comeback_delay", "0")
tests = [ "gas_serv_build_gas_resp_payload" ]
for t in tests:
with alloc_fail(hapd, 1, t):
if "OK" not in dev[0].request("ANQP_GET " + bssid + " 258"):
raise Exception("ANQP_GET command failed")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
if ev is None:
raise Exception("No GAS-QUERY-DONE seen")
dev[0].dump_monitor()
with alloc_fail(hapd, 1,
"gas_build_initial_resp;gas_serv_rx_gas_initial_req"):
req = dev[0].request("GAS_REQUEST " + bssid + " 42 000102000101")
if "FAIL" in req:
raise Exception("GAS query request rejected")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
if ev is None:
raise Exception("No GAS-QUERY-DONE seen")
dev[0].dump_monitor()
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5")
if "OK" not in wpas.request("P2P_SET listen_channel 1"):
raise Exception("Failed to set listen channel")
if "OK" not in wpas.p2p_listen():
raise Exception("Failed to start listen state")
if "FAIL" in wpas.request("SET ext_mgmt_frame_handling 1"):
raise Exception("Failed to enable external management frame handling")
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST, 1)
req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
with alloc_fail(hapd, 1,
"gas_anqp_build_comeback_resp_buf;gas_serv_rx_gas_comeback_req"):
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
wait_fail_trigger(hapd, "GET_ALLOC_FAIL")
def test_gas_anqp_overrides(dev, apdev):
"""GAS and ANQP overrides"""
params = { "ssid": "gas/anqp",
"interworking": "1",
"anqp_elem": [ "257:111111",
"258:222222",
"260:333333",
"261:444444",
"262:555555",
"263:666666",
"264:777777",
"268:888888",
"275:999999" ] }
hapd = hostapd.add_ap(apdev[0], params)
bssid = apdev[0]['bssid']
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
if "OK" not in dev[0].request("ANQP_GET " + bssid + " 257,258,260,261,262,263,264,268,275"):
raise Exception("ANQP_GET command failed")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("GAS query timed out")
for i in range(9):
ev = dev[0].wait_event(["RX-ANQP"], timeout=5)
if ev is None:
raise Exception("ANQP response not seen")
def test_gas_no_dialog_token_match(dev, apdev):
"""GAS and no dialog token match for comeback request"""
hapd = start_ap(apdev[0])
hapd.set("gas_frag_limit", "50")
bssid = apdev[0]['bssid']
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5")
if "OK" not in wpas.request("P2P_SET listen_channel 1"):
raise Exception("Failed to set listen channel")
if "OK" not in wpas.p2p_listen():
raise Exception("Failed to start listen state")
if "FAIL" in wpas.request("SET ext_mgmt_frame_handling 1"):
raise Exception("Failed to enable external management frame handling")
anqp_query = struct.pack('<HHHHHHHHHH', 256, 16, 257, 258, 260, 261, 262, 263, 264, 268)
gas = struct.pack('<H', len(anqp_query)) + anqp_query
dialog_token = 100
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + anqp_adv_proto() + gas
req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
if resp is None:
raise Exception("MGMT-RX timeout")
if 'payload' not in resp:
raise Exception("Missing payload")
gresp = parse_gas(resp['payload'])
if gresp['dialog_token'] != dialog_token:
raise Exception("Dialog token mismatch")
status_code = gresp['status_code']
if status_code != 0:
raise Exception("Unexpected status code {}".format(status_code))
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST,
dialog_token + 1)
req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(msg))
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
resp = wpas.mgmt_rx()
if resp is None:
raise Exception("MGMT-RX timeout")
if 'payload' not in resp:
raise Exception("Missing payload")
gresp = parse_gas(resp['payload'])
status_code = gresp['status_code']
if status_code != 60:
raise Exception("Unexpected failure status code {}".format(status_code))
def test_gas_vendor_spec_errors(dev, apdev):
"""GAS and vendor specific request error cases"""
bssid = apdev[0]['bssid']
params = hs20_ap_params()
params['hessid'] = bssid
params['osu_server_uri'] = "uri"
params['hs20_icon'] = "32:32:eng:image/png:icon32:/tmp/icon32.png"
del params['nai_realm']
hapd = hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
tests = [ "00 12340000",
"00 dddd0300506fff",
"00 dddd0400506fffff",
"00 dddd0400506f9aff",
"00 dddd0400506f9a11",
"00 dddd0600506f9a11ff00",
"00 dddd0600506f9a110600",
"00 dddd0600506f9a110600",
"00 dddd0700506f9a11060000",
"00 dddd0700506f9a110600ff",
"00 dddd0800506f9a110600ff00",
"00 dddd0900506f9a110600ff0000",
"00 dddd0900506f9a110600ff0001",
"00 dddd0900506f9a110600ffff00",
"00 dddd0a00506f9a110600ff00013b",
"00 dddd0700506f9a110100ff",
"00 dddd0700506f9a11010008",
"00 dddd14",
"00 dddd1400506f9a11" ]
for t in tests:
req = dev[0].request("GAS_REQUEST " + bssid + " " + t)
if "FAIL" in req:
raise Exception("GAS query request rejected")
ev = dev[0].wait_event(["GAS-QUERY-START"], timeout=5)
if ev is None:
raise Exception("GAS query did not start")
ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=5)
if ev is None:
raise Exception("GAS query did not complete")
if t == "00 dddd0600506f9a110600":
hapd.set("nai_realm", "0,another.example.com")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5")
if "OK" not in wpas.request("P2P_SET listen_channel 1"):
raise Exception("Failed to set listen channel")
if "OK" not in wpas.p2p_listen():
raise Exception("Failed to start listen state")
if "FAIL" in wpas.request("SET ext_mgmt_frame_handling 1"):
raise Exception("Failed to enable external management frame handling")
anqp_query = struct.pack('<HHHHHHHHHH', 256, 16, 257, 258, 260, 261, 262, 263, 264, 268)
gas = struct.pack('<H', len(anqp_query)) + anqp_query
dialog_token = 100
adv = struct.pack('BBBB', 109, 2, 0, 0)
adv2 = struct.pack('BBB', 108, 1, 0)
adv3 = struct.pack('BBBB', 108, 3, 0, 0)
msg = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + adv + gas
msg2 = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + adv2 + gas
msg3 = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + adv3
msg4 = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + anqp_adv_proto()
msg5 = struct.pack('<BBB', ACTION_CATEG_PUBLIC, GAS_INITIAL_REQUEST,
dialog_token) + anqp_adv_proto() + struct.pack('<H', 1)
msg6 = struct.pack('<BB', ACTION_CATEG_PUBLIC, GAS_COMEBACK_REQUEST)
tests = [ msg, msg2, msg3, msg4, msg5, msg6 ]
for t in tests:
req = "MGMT_TX {} {} freq=2412 wait_time=10 action={}".format(bssid, bssid, binascii.hexlify(t))
if "OK" not in wpas.request(req):
raise Exception("Could not send management frame")
ev = wpas.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("No ACK frame seen")