tests: MACsec with hostapd

Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
This commit is contained in:
Jouni Malinen 2019-06-03 17:49:55 +03:00
parent abe025dd31
commit 41bca92662

View file

@ -12,6 +12,7 @@ import signal
import subprocess
import time
import hostapd
from wpasupplicant import WpaSupplicant
import hwsim_utils
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
@ -138,6 +139,32 @@ def set_mka_psk_config(dev, mka_priority=None, integ_only=False, port=None,
dev.select_network(id)
def set_mka_eap_config(dev, mka_priority=None, integ_only=False, port=None):
dev.set("eapol_version", "3")
dev.set("ap_scan", "0")
dev.set("fast_reauth", "1")
id = dev.add_network()
dev.set_network(id, "key_mgmt", "NONE")
dev.set_network(id, "eapol_flags", "0")
dev.set_network(id, "macsec_policy", "1")
if integ_only:
dev.set_network(id, "macsec_integ_only", "1")
if mka_priority is not None:
dev.set_network(id, "mka_priority", str(mka_priority))
if port is not None:
dev.set_network(id, "macsec_port", str(port))
dev.set_network(id, "key_mgmt", "IEEE8021X")
dev.set_network(id, "eap", "TTLS")
dev.set_network_quoted(id, "ca_cert", "auth_serv/ca.pem")
dev.set_network_quoted(id, "phase2", "auth=MSCHAPV2")
dev.set_network_quoted(id, "anonymous_identity", "ttls")
dev.set_network_quoted(id, "identity", "DOMAIN\mschapv2 user")
dev.set_network_quoted(id, "password", "password")
dev.select_network(id)
def log_ip_macsec():
cmd = subprocess.Popen(["ip", "macsec", "show"],
stdout=subprocess.PIPE,
@ -184,16 +211,20 @@ def lower_addr(addr1, addr2):
return False
return False
def wait_mka_done(wpa, expect_failure=False):
def wait_mka_done(wpa, expect_failure=False, hostapd=False):
max_iter = 14 if expect_failure else 40
for i in range(max_iter):
done = True
for w in wpa:
secured = w.get_status_field("Secured")
peers = int(w.get_status_field("live_peers"))
live_peers = w.get_status_field("live_peers")
peers = int(live_peers) if live_peers else 0
if expect_failure and (secured == "Yes" or peers > 0):
raise Exception("MKA completed unexpectedly")
if peers != len(wpa) - 1 or secured != "Yes":
expect_peers = len(wpa) - 1
if hostapd:
expect_peers += 1
if peers != expect_peers or secured != "Yes":
done = False
break
w.dump_monitor()
@ -207,6 +238,10 @@ def wait_mka_done(wpa, expect_failure=False):
if not done:
raise Exception("MKA not completed successfully")
if hostapd:
# TODO: check that hostapd is the key server
return
key_server = None
ks_prio = 999
for w in wpa:
@ -702,3 +737,205 @@ def test_macsec_psk_fail_cp2(dev, apdev):
wait_mka_done(wpa)
finally:
cleanup_macsec()
def cleanup_macsec_hostapd():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
wpas.interface_remove("veth0")
del wpas
hapd = hostapd.HostapdGlobal()
hapd.remove('veth1')
subprocess.call(["ip", "link", "del", "veth0"],
stderr=open('/dev/null', 'w'))
log_ip_link()
def test_macsec_hostapd_psk(dev, apdev, params):
"""MACsec PSK with hostapd"""
try:
run_macsec_hostapd_psk(dev, apdev, params, "macsec_hostapd_psk")
finally:
cleanup_macsec_hostapd()
def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
port0=None, port1=None, ckn0=None, ckn1=None,
cak0=None, cak1=None, expect_failure=False):
add_veth()
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
for i in range(2):
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0,
cak=cak0, mka_priority=100)
if cak1 is None:
cak1 = "000102030405060708090a0b0c0d0e0f"
if ckn1 is None:
ckn1 = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
params = {"driver": "macsec_linux",
"interface": "veth1",
"eapol_version": "3",
"mka_cak": cak1,
"mka_ckn": ckn1,
"macsec_policy": "1",
"mka_priority": "1"}
if integ_only:
params["macsec_integ_only"] = "1"
if port1 is not None:
params["macsec_port"] = str(port1)
apdev = {'ifname': 'veth1'}
try:
hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
except:
raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
log_ip_macsec()
log_ip_link()
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
log_ip_link()
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
log_ip_macsec()
hwsim_utils.test_connectivity(wpas0, hapd,
ifname1=macsec_ifname0,
ifname2=macsec_ifname1,
send_len=1400)
log_ip_macsec()
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()
def test_macsec_hostapd_eap(dev, apdev, params):
"""MACsec EAP with hostapd"""
try:
run_macsec_hostapd_eap(dev, apdev, params, "macsec_hostapd_eap")
finally:
cleanup_macsec_hostapd()
def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
port0=None, port1=None, expect_failure=False):
add_veth()
cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap")
cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap")
cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap")
cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap")
for i in range(2):
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
'-w', cap_veth0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
'-w', cap_veth1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
set_mka_eap_config(wpas0, integ_only=integ_only, port=port0,
mka_priority=100)
params = {"driver": "macsec_linux",
"interface": "veth1",
"eapol_version": "3",
"macsec_policy": "1",
"mka_priority": "1",
"ieee8021x": "1",
"auth_server_addr": "127.0.0.1",
"auth_server_port": "1812",
"auth_server_shared_secret": "radius",
"nas_identifier": "nas.w1.fi"}
if integ_only:
params["macsec_integ_only"] = "1"
if port1 is not None:
params["macsec_port"] = str(port1)
apdev = {'ifname': 'veth1'}
try:
hapd = hostapd.add_ap(apdev, params, driver="macsec_linux")
except:
raise HwsimSkip("No CONFIG_MACSEC=y in hostapd")
log_ip_macsec()
log_ip_link()
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER"))
wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True)
log_ip_link()
if expect_failure:
for i in range(len(cmd)):
cmd[i].terminate()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
'-w', cap_macsec0, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
'-w', cap_macsec1, '-s', '2000',
'--immediate-mode'],
stderr=open('/dev/null', 'w'))
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
log_ip_macsec()
hwsim_utils.test_connectivity(wpas0, hapd,
ifname1=macsec_ifname0,
ifname2=macsec_ifname1,
send_len=1400)
log_ip_macsec()
time.sleep(1)
for i in range(len(cmd)):
cmd[i].terminate()