From 41bca926628a1fe155a073215845d50e5e92e916 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Mon, 3 Jun 2019 17:49:55 +0300 Subject: [PATCH] tests: MACsec with hostapd Signed-off-by: Jouni Malinen --- tests/hwsim/test_macsec.py | 243 ++++++++++++++++++++++++++++++++++++- 1 file changed, 240 insertions(+), 3 deletions(-) diff --git a/tests/hwsim/test_macsec.py b/tests/hwsim/test_macsec.py index 3540bcbc8..8ef4fec61 100644 --- a/tests/hwsim/test_macsec.py +++ b/tests/hwsim/test_macsec.py @@ -12,6 +12,7 @@ import signal import subprocess import time +import hostapd from wpasupplicant import WpaSupplicant import hwsim_utils from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger @@ -138,6 +139,32 @@ def set_mka_psk_config(dev, mka_priority=None, integ_only=False, port=None, dev.select_network(id) +def set_mka_eap_config(dev, mka_priority=None, integ_only=False, port=None): + dev.set("eapol_version", "3") + dev.set("ap_scan", "0") + dev.set("fast_reauth", "1") + + id = dev.add_network() + dev.set_network(id, "key_mgmt", "NONE") + dev.set_network(id, "eapol_flags", "0") + dev.set_network(id, "macsec_policy", "1") + if integ_only: + dev.set_network(id, "macsec_integ_only", "1") + if mka_priority is not None: + dev.set_network(id, "mka_priority", str(mka_priority)) + if port is not None: + dev.set_network(id, "macsec_port", str(port)) + + dev.set_network(id, "key_mgmt", "IEEE8021X") + dev.set_network(id, "eap", "TTLS") + dev.set_network_quoted(id, "ca_cert", "auth_serv/ca.pem") + dev.set_network_quoted(id, "phase2", "auth=MSCHAPV2") + dev.set_network_quoted(id, "anonymous_identity", "ttls") + dev.set_network_quoted(id, "identity", "DOMAIN\mschapv2 user") + dev.set_network_quoted(id, "password", "password") + + dev.select_network(id) + def log_ip_macsec(): cmd = subprocess.Popen(["ip", "macsec", "show"], stdout=subprocess.PIPE, @@ -184,16 +211,20 @@ def lower_addr(addr1, addr2): return False return False -def wait_mka_done(wpa, expect_failure=False): +def wait_mka_done(wpa, expect_failure=False, hostapd=False): max_iter = 14 if expect_failure else 40 for i in range(max_iter): done = True for w in wpa: secured = w.get_status_field("Secured") - peers = int(w.get_status_field("live_peers")) + live_peers = w.get_status_field("live_peers") + peers = int(live_peers) if live_peers else 0 if expect_failure and (secured == "Yes" or peers > 0): raise Exception("MKA completed unexpectedly") - if peers != len(wpa) - 1 or secured != "Yes": + expect_peers = len(wpa) - 1 + if hostapd: + expect_peers += 1 + if peers != expect_peers or secured != "Yes": done = False break w.dump_monitor() @@ -207,6 +238,10 @@ def wait_mka_done(wpa, expect_failure=False): if not done: raise Exception("MKA not completed successfully") + if hostapd: + # TODO: check that hostapd is the key server + return + key_server = None ks_prio = 999 for w in wpa: @@ -702,3 +737,205 @@ def test_macsec_psk_fail_cp2(dev, apdev): wait_mka_done(wpa) finally: cleanup_macsec() + +def cleanup_macsec_hostapd(): + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False) + wpas.interface_remove("veth0") + del wpas + hapd = hostapd.HostapdGlobal() + hapd.remove('veth1') + subprocess.call(["ip", "link", "del", "veth0"], + stderr=open('/dev/null', 'w')) + log_ip_link() + +def test_macsec_hostapd_psk(dev, apdev, params): + """MACsec PSK with hostapd""" + try: + run_macsec_hostapd_psk(dev, apdev, params, "macsec_hostapd_psk") + finally: + cleanup_macsec_hostapd() + +def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False, + port0=None, port1=None, ckn0=None, ckn1=None, + cak0=None, cak1=None, expect_failure=False): + add_veth() + + cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") + cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") + cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") + cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap") + + for i in range(2): + subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"]) + + cmd = {} + cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', + '-w', cap_veth0, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', + '-w', cap_veth1, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + + wpa = add_wpas_interfaces(count=1) + wpas0 = wpa[0] + + set_mka_psk_config(wpas0, integ_only=integ_only, port=port0, ckn=ckn0, + cak=cak0, mka_priority=100) + + if cak1 is None: + cak1 = "000102030405060708090a0b0c0d0e0f" + if ckn1 is None: + ckn1 = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f" + params = {"driver": "macsec_linux", + "interface": "veth1", + "eapol_version": "3", + "mka_cak": cak1, + "mka_ckn": ckn1, + "macsec_policy": "1", + "mka_priority": "1"} + if integ_only: + params["macsec_integ_only"] = "1" + if port1 is not None: + params["macsec_port"] = str(port1) + apdev = {'ifname': 'veth1'} + try: + hapd = hostapd.add_ap(apdev, params, driver="macsec_linux") + except: + raise HwsimSkip("No CONFIG_MACSEC=y in hostapd") + + log_ip_macsec() + log_ip_link() + + logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) + logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER")) + + wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True) + log_ip_link() + + if expect_failure: + for i in range(len(cmd)): + cmd[i].terminate() + return + + macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") + macsec_ifname1 = hapd.get_driver_status_field("parent_ifname") + + cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, + '-w', cap_macsec0, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, + '-w', cap_macsec1, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + time.sleep(0.5) + + logger.info("wpas0 MIB:\n" + wpas0.request("MIB")) + logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) + log_ip_macsec() + hwsim_utils.test_connectivity(wpas0, hapd, + ifname1=macsec_ifname0, + ifname2=macsec_ifname1, + send_len=1400) + log_ip_macsec() + + time.sleep(1) + for i in range(len(cmd)): + cmd[i].terminate() + +def test_macsec_hostapd_eap(dev, apdev, params): + """MACsec EAP with hostapd""" + try: + run_macsec_hostapd_eap(dev, apdev, params, "macsec_hostapd_eap") + finally: + cleanup_macsec_hostapd() + +def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False, + port0=None, port1=None, expect_failure=False): + add_veth() + + cap_veth0 = os.path.join(params['logdir'], prefix + ".veth0.pcap") + cap_veth1 = os.path.join(params['logdir'], prefix + ".veth1.pcap") + cap_macsec0 = os.path.join(params['logdir'], prefix + ".macsec0.pcap") + cap_macsec1 = os.path.join(params['logdir'], prefix + ".macsec1.pcap") + + for i in range(2): + subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"]) + + cmd = {} + cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0', + '-w', cap_veth0, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1', + '-w', cap_veth1, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + + wpa = add_wpas_interfaces(count=1) + wpas0 = wpa[0] + + set_mka_eap_config(wpas0, integ_only=integ_only, port=port0, + mka_priority=100) + + params = {"driver": "macsec_linux", + "interface": "veth1", + "eapol_version": "3", + "macsec_policy": "1", + "mka_priority": "1", + "ieee8021x": "1", + "auth_server_addr": "127.0.0.1", + "auth_server_port": "1812", + "auth_server_shared_secret": "radius", + "nas_identifier": "nas.w1.fi"} + if integ_only: + params["macsec_integ_only"] = "1" + if port1 is not None: + params["macsec_port"] = str(port1) + apdev = {'ifname': 'veth1'} + try: + hapd = hostapd.add_ap(apdev, params, driver="macsec_linux") + except: + raise HwsimSkip("No CONFIG_MACSEC=y in hostapd") + + log_ip_macsec() + log_ip_link() + + logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) + logger.info("wpas0 STATUS-DRIVER:\n" + wpas0.request("STATUS-DRIVER")) + + wait_mka_done(wpa, expect_failure=expect_failure, hostapd=True) + log_ip_link() + + if expect_failure: + for i in range(len(cmd)): + cmd[i].terminate() + return + + macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname") + macsec_ifname1 = hapd.get_driver_status_field("parent_ifname") + + cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0, + '-w', cap_macsec0, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1, + '-w', cap_macsec1, '-s', '2000', + '--immediate-mode'], + stderr=open('/dev/null', 'w')) + time.sleep(0.5) + + logger.info("wpas0 MIB:\n" + wpas0.request("MIB")) + logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS")) + log_ip_macsec() + hwsim_utils.test_connectivity(wpas0, hapd, + ifname1=macsec_ifname0, + ifname2=macsec_ifname1, + send_len=1400) + log_ip_macsec() + + time.sleep(1) + for i in range(len(cmd)): + cmd[i].terminate()