tests: Convert proxyarp tests to use DATA_TEST_FRAME

This is more robust and extensible than configuring IPv6 addresses on
the interfaces and trying to use ping6 or some other external tools to
generate suitable IPv6 frames.

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
This commit is contained in:
Jouni Malinen 2014-11-27 23:53:22 +02:00 committed by Jouni Malinen
parent fc0ef7c0e7
commit 67aeee118a

View file

@ -4,12 +4,15 @@
# This software may be distributed under the terms of the BSD license. # This software may be distributed under the terms of the BSD license.
# See README for more details. # See README for more details.
import binascii
import struct
import time import time
import subprocess import subprocess
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
import os import os
import os.path import os.path
import socket
import subprocess import subprocess
import hostapd import hostapd
@ -2135,12 +2138,6 @@ def _test_ap_hs20_proxyarp(dev, apdev):
dev[0].hs20_enable() dev[0].hs20_enable()
subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64',
'dev', dev[0].ifname])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64',
'dev', dev[1].ifname])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64',
'dev', dev[1].ifname])
id = dev[0].add_cred_values({ 'realm': "example.com", id = dev[0].add_cred_values({ 'realm': "example.com",
'username': "hs20-test", 'username': "hs20-test",
@ -2157,12 +2154,40 @@ def _test_ap_hs20_proxyarp(dev, apdev):
scan_freq="2412") scan_freq="2412")
time.sleep(0.1) time.sleep(0.1)
subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'], addr0 = dev[0].p2p_interface_addr()
stdout=open('/dev/null', 'w')) addr1 = dev[1].p2p_interface_addr()
src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':',''))
src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':',''))
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt=src_ll_opt0)
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
opt=src_ll_opt1)
if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
opt=src_ll_opt1)
if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
matches = get_permanent_neighbors("ap-br0") matches = get_permanent_neighbors("ap-br0")
logger.info("After connect: " + str(matches)) logger.info("After connect: " + str(matches))
if len(matches) < 1: if len(matches) != 3:
raise Exception("No neighbor entries after connect") raise Exception("Unexpected number of neighbor entries after connect")
if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches:
raise Exception("dev0 addr missing")
if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
raise Exception("dev1 addr(1) missing")
if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
raise Exception("dev1 addr(2) missing")
dev[0].request("DISCONNECT") dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT") dev[1].request("DISCONNECT")
time.sleep(0.5) time.sleep(0.5)
@ -2181,15 +2206,49 @@ def test_ap_hs20_proxyarp(dev, apdev):
stderr=open('/dev/null', 'w')) stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'], subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w')) stderr=open('/dev/null', 'w'))
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64',
'dev', dev[0].ifname])
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64',
'dev', dev[1].ifname])
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64',
'dev', dev[1].ifname])
return res return res
def ip_checksum(buf):
sum = 0
if len(buf) & 0x01:
buf += '\0x00'
for i in range(0, len(buf), 2):
val, = struct.unpack('H', buf[i:i+2])
sum += val
while (sum >> 16):
sum = (sum & 0xffff) + (sum >> 16)
return struct.pack('H', ~sum & 0xffff)
def build_icmpv6(ipv6_addrs, type, code, payload):
start = struct.pack("BB", type, code)
end = payload
icmp = start + '\x00\x00' + end
pseudo = ipv6_addrs + struct.pack(">LBBBB", len(icmp), 0, 0, 0, 58)
csum = ip_checksum(pseudo + icmp)
return start + csum + end
def build_ns(src_ll, ip_src, ip_dst, target, opt=None):
link_mc = binascii.unhexlify("3333ff000002")
_src_ll = binascii.unhexlify(src_ll.replace(':',''))
proto = '\x86\xdd'
ehdr = link_mc + _src_ll + proto
_ip_src = socket.inet_pton(socket.AF_INET6, ip_src)
_ip_dst = socket.inet_pton(socket.AF_INET6, ip_dst)
reserved = '\x00\x00\x00\x00'
_target = socket.inet_pton(socket.AF_INET6, target)
if opt:
payload = reserved + _target + opt
else:
payload = reserved + _target
icmp = build_icmpv6(_ip_src + _ip_dst, 135, 0, payload)
ipv6 = struct.pack('>BBBBHBB', 0x60, 0, 0, 0, len(icmp), 58, 255)
ipv6 += _ip_src + _ip_dst
return ehdr + ipv6 + icmp
def get_permanent_neighbors(ifname): def get_permanent_neighbors(ifname):
cmd = subprocess.Popen(['ip', 'nei'], stdout=subprocess.PIPE) cmd = subprocess.Popen(['ip', 'nei'], stdout=subprocess.PIPE)
res = cmd.stdout.read() res = cmd.stdout.read()
@ -2218,27 +2277,78 @@ def _test_proxyarp_open(dev, apdev):
subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64',
'dev', dev[0].ifname])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64',
'dev', dev[1].ifname])
subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64',
'dev', dev[1].ifname])
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
time.sleep(0.1) time.sleep(0.1)
subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'], addr0 = dev[0].p2p_interface_addr()
stdout=open('/dev/null', 'w')) addr1 = dev[1].p2p_interface_addr()
subprocess.call(['ping6', 'aaaa:bbbb:dddd::2', '-c', '1', '-w', '1'],
stdout=open('/dev/null', 'w')) src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':',''))
subprocess.call(['ping6', 'aaaa:bbbb:eeee::2', '-c', '1', '-w', '1'], src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':',''))
stdout=open('/dev/null', 'w'))
# DAD NS
pkt = build_ns(src_ll=addr0, ip_src="::", ip_dst="ff02::1:ff00:2",
target="aaaa:bbbb:cccc::2", opt=src_ll_opt0)
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt=src_ll_opt0)
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
# test frame without source link-layer address option
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2")
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
# test frame with bogus option
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt="\x70\x01\x01\x02\x03\x04\x05\x05")
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
# test frame with truncated source link-layer address option
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt="\x01\x01\x01\x02\x03\x04")
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
# test frame with foreign source link-layer address option
pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2",
opt="\x01\x01\x01\x02\x03\x04\x05\x06")
if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2",
opt=src_ll_opt1)
if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2",
ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2",
opt=src_ll_opt1)
if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
# another copy for additional code coverage
if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)):
raise Exception("DATA_TEST_FRAME failed")
matches = get_permanent_neighbors("ap-br0") matches = get_permanent_neighbors("ap-br0")
logger.info("After connect: " + str(matches)) logger.info("After connect: " + str(matches))
if len(matches) != 3: if len(matches) != 3:
raise Exception("Unexpected number of neighbor entries after connect") raise Exception("Unexpected number of neighbor entries after connect")
if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches:
raise Exception("dev0 addr missing")
if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
raise Exception("dev1 addr(1) missing")
if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches:
raise Exception("dev1 addr(2) missing")
dev[0].request("DISCONNECT") dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT") dev[1].request("DISCONNECT")
time.sleep(0.5) time.sleep(0.5)
@ -2257,11 +2367,5 @@ def test_proxyarp_open(dev, apdev):
stderr=open('/dev/null', 'w')) stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'ap-br0'], subprocess.call(['brctl', 'delbr', 'ap-br0'],
stderr=open('/dev/null', 'w')) stderr=open('/dev/null', 'w'))
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64',
'dev', dev[0].ifname])
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64',
'dev', dev[1].ifname])
subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64',
'dev', dev[1].ifname])
return res return res