#!/usr/bin/python # # Python class for controlling wpa_supplicant # Copyright (c) 2013, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import os import time import logging import re import subprocess import wpaspy logger = logging.getLogger() wpas_ctrl = '/var/run/wpa_supplicant' class WpaSupplicant: def __init__(self, ifname, global_iface=None): self.ifname = ifname self.group_ifname = None self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) self.mon.attach() self.global_iface = global_iface if global_iface: self.global_ctrl = wpaspy.Ctrl(global_iface) self.global_mon = wpaspy.Ctrl(global_iface) self.global_mon.attach() def request(self, cmd): logger.debug(self.ifname + ": CTRL: " + cmd) return self.ctrl.request(cmd) def global_request(self, cmd): if self.global_iface is None: self.request(cmd) else: logger.debug(self.ifname + ": CTRL: " + cmd) return self.global_ctrl.request(cmd) def group_request(self, cmd): if self.group_ifname and self.group_ifname != self.ifname: logger.debug(self.group_ifname + ": CTRL: " + cmd) gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) return gctrl.request(cmd) return self.request(cmd) def ping(self): return "PONG" in self.request("PING") def reset(self): res = self.request("FLUSH") if not "OK" in res: logger.info("FLUSH to " + self.ifname + " failed: " + res) self.request("SET ignore_old_scan_res 0") self.request("SET external_sim 0") self.request("SET hessid 00:00:00:00:00:00") self.request("SET access_network_type 15") self.request("SET p2p_add_cli_chan 0") self.request("SET p2p_no_go_freq ") self.request("SET p2p_pref_chan ") self.request("SET disallow_aps ") self.request("SET p2p_no_group_iface 1") self.request("P2P_SET per_sta_psk 0") self.request("P2P_SET disabled 0") self.request("P2P_SERVICE_FLUSH") self.group_ifname = None self.dump_monitor() iter = 0 while iter < 60: state = self.get_driver_status_field("scan_state") if "SCAN_STARTED" in state or "SCAN_REQUESTED" in state: logger.info(self.ifname + ": Waiting for scan operation to complete before continuing") time.sleep(1) else: break iter = iter + 1 if iter == 60: logger.error(self.ifname + ": Driver scan state did not clear") print "Trying to clear cfg80211/mac80211 scan state" try: cmd = ["sudo", "ifconfig", self.ifname, "down"] subprocess.call(cmd) except subprocess.CalledProcessError, e: logger.info("ifconfig failed: " + str(e.returncode)) logger.info(e.output) try: cmd = ["sudo", "ifconfig", self.ifname, "up"] subprocess.call(cmd) except subprocess.CalledProcessError, e: logger.info("ifconfig failed: " + str(e.returncode)) logger.info(e.output) if not self.ping(): logger.info("No PING response from " + self.ifname + " after reset") def add_network(self): id = self.request("ADD_NETWORK") if "FAIL" in id: raise Exception("ADD_NETWORK failed") return int(id) def remove_network(self, id): id = self.request("REMOVE_NETWORK " + str(id)) if "FAIL" in id: raise Exception("REMOVE_NETWORK failed") return None def set_network(self, id, field, value): res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value) if "FAIL" in res: raise Exception("SET_NETWORK failed") return None def set_network_quoted(self, id, field, value): res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') if "FAIL" in res: raise Exception("SET_NETWORK failed") return None def list_networks(self): res = self.request("LIST_NETWORKS") lines = res.splitlines() networks = [] for l in lines: if "network id" in l: continue [id,ssid,bssid,flags] = l.split('\t') network = {} network['id'] = id network['ssid'] = ssid network['bssid'] = bssid network['flags'] = flags networks.append(network) return networks def hs20_enable(self): self.request("SET interworking 1") self.request("SET hs20 1") def add_cred(self): id = self.request("ADD_CRED") if "FAIL" in id: raise Exception("ADD_CRED failed") return int(id) def remove_cred(self, id): id = self.request("REMOVE_CRED " + str(id)) if "FAIL" in id: raise Exception("REMOVE_CRED failed") return None def set_cred(self, id, field, value): res = self.request("SET_CRED " + str(id) + " " + field + " " + value) if "FAIL" in res: raise Exception("SET_CRED failed") return None def set_cred_quoted(self, id, field, value): res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"') if "FAIL" in res: raise Exception("SET_CRED failed") return None def add_cred_values(self, params): id = self.add_cred() quoted = [ "realm", "username", "password", "domain", "imsi", "excluded_ssid", "milenage", "ca_cert", "client_cert", "private_key" ] for field in quoted: if field in params: self.set_cred_quoted(id, field, params[field]) not_quoted = [ "eap", "roaming_consortium", "required_roaming_consortium" ] for field in not_quoted: if field in params: self.set_cred(id, field, params[field]) return id; def select_network(self, id): id = self.request("SELECT_NETWORK " + str(id)) if "FAIL" in id: raise Exception("SELECT_NETWORK failed") return None def connect_network(self, id): self.dump_monitor() self.select_network(id) ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("Association with the AP timed out") self.dump_monitor() def get_status(self): res = self.request("STATUS") lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_status_field(self, field): vals = self.get_status() if field in vals: return vals[field] return None def get_group_status(self): res = self.group_request("STATUS") lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_group_status_field(self, field): vals = self.get_group_status() if field in vals: return vals[field] return None def get_driver_status(self): res = self.request("STATUS-DRIVER") lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_driver_status_field(self, field): vals = self.get_driver_status() if field in vals: return vals[field] return None def p2p_dev_addr(self): return self.get_status_field("p2p_device_address") def p2p_interface_addr(self): return self.get_group_status_field("address") def p2p_listen(self): return self.global_request("P2P_LISTEN") def p2p_find(self, social=False): if social: return self.global_request("P2P_FIND type=social") return self.global_request("P2P_FIND") def p2p_stop_find(self): return self.global_request("P2P_STOP_FIND") def wps_read_pin(self): #TODO: make this random self.pin = "12345670" return self.pin def peer_known(self, peer, full=True): res = self.global_request("P2P_PEER " + peer) if peer.lower() not in res.lower(): return False if not full: return True return "[PROBE_REQ_ONLY]" not in res def discover_peer(self, peer, full=True, timeout=15, social=True): logger.info(self.ifname + ": Trying to discover peer " + peer) if self.peer_known(peer, full): return True self.p2p_find(social) count = 0 while count < timeout: time.sleep(1) count = count + 1 if self.peer_known(peer, full): return True return False def get_peer(self, peer): res = self.global_request("P2P_PEER " + peer) if peer.lower() not in res.lower(): raise Exception("Peer information not available") lines = res.splitlines() vals = dict() for l in lines: if '=' in l: [name,value] = l.split('=', 1) vals[name] = value return vals def group_form_result(self, ev, expect_failure=False, go_neg_res=None): if expect_failure: if "P2P-GROUP-STARTED" in ev: raise Exception("Group formation succeeded when expecting failure") exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)' s = re.split(exp, ev) if len(s) < 3: return None res = {} res['result'] = 'go-neg-failed' res['status'] = int(s[2]) return res if "P2P-GROUP-STARTED" not in ev: raise Exception("No P2P-GROUP-STARTED event seen") exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)' s = re.split(exp, ev) if len(s) < 8: raise Exception("Could not parse P2P-GROUP-STARTED") res = {} res['result'] = 'success' res['ifname'] = s[2] self.group_ifname = s[2] res['role'] = s[3] res['ssid'] = s[4] res['freq'] = s[5] if "[PERSISTENT]" in ev: res['persistent'] = True else: res['persistent'] = False p = re.match(r'psk=([0-9a-f]*)', s[6]) if p: res['psk'] = p.group(1) p = re.match(r'passphrase="(.*)"', s[6]) if p: res['passphrase'] = p.group(1) res['go_dev_addr'] = s[7] if go_neg_res: exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)' s = re.split(exp, go_neg_res) if len(s) < 4: raise Exception("Could not parse P2P-GO-NEG-SUCCESS") res['go_neg_role'] = s[2] res['go_neg_freq'] = s[3] return res def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, persistent=False, freq=None): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) if persistent: cmd = cmd + " persistent" if "OK" in self.global_request(cmd): return None raise Exception("P2P_CONNECT (auth) failed") def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", "P2P-GO-NEG-FAILURE"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") if "P2P-GO-NEG-SUCCESS" in ev: go_neg_res = ev ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() return self.group_form_result(ev, expect_failure, go_neg_res) def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False, persistent=False, freq=None): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() if pin: cmd = "P2P_CONNECT " + peer + " " + pin + " " + method else: cmd = "P2P_CONNECT " + peer + " " + method if go_intent: cmd = cmd + ' go_intent=' + str(go_intent) if freq: cmd = cmd + ' freq=' + str(freq) if persistent: cmd = cmd + " persistent" if "OK" in self.global_request(cmd): if timeout == 0: self.dump_monitor() return None go_neg_res = None ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", "P2P-GO-NEG-FAILURE"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") if "P2P-GO-NEG-SUCCESS" in ev: go_neg_res = ev ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() return self.group_form_result(ev, expect_failure, go_neg_res) raise Exception("P2P_CONNECT failed") def wait_event(self, events, timeout=10): count = 0 while count < timeout * 10: count = count + 1 time.sleep(0.1) while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) for event in events: if event in ev: return ev return None def wait_global_event(self, events, timeout): if self.global_iface is None: self.wait_event(events, timeout) else: count = 0 while count < timeout * 10: count = count + 1 time.sleep(0.1) while self.global_mon.pending(): ev = self.global_mon.recv() logger.debug(self.ifname + "(global): " + ev) for event in events: if event in ev: return ev return None def wait_go_ending_session(self): ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3) if ev is None: raise Exception("Group removal event timed out") if "reason=GO_ENDING_SESSION" not in ev: raise Exception("Unexpected group removal reason") def dump_monitor(self): while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) while self.global_mon.pending(): ev = self.global_mon.recv() logger.debug(self.ifname + "(global): " + ev) def remove_group(self, ifname=None): if ifname is None: ifname = self.group_ifname if self.group_ifname else self.ifname if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname): raise Exception("Group could not be removed") self.group_ifname = None def p2p_start_go(self, persistent=None, freq=None): self.dump_monitor() cmd = "P2P_GROUP_ADD" if persistent is None: pass elif persistent is True: cmd = cmd + " persistent" else: cmd = cmd + " persistent=" + str(persistent) if freq: cmd = cmd + " freq=" + str(freq) if "OK" in self.global_request(cmd): ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5) if ev is None: raise Exception("GO start up timed out") self.dump_monitor() return self.group_form_result(ev) raise Exception("P2P_GROUP_ADD failed") def p2p_go_authorize_client(self, pin): cmd = "WPS_PIN any " + pin if "FAIL" in self.group_request(cmd): raise Exception("Failed to authorize client connection on GO") return None def p2p_go_authorize_client_pbc(self): cmd = "WPS_PBC" if "FAIL" in self.group_request(cmd): raise Exception("Failed to authorize client connection on GO") return None def p2p_connect_group(self, go_addr, pin, timeout=0, social=False): self.dump_monitor() if not self.discover_peer(go_addr, social=social): raise Exception("GO " + go_addr + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + go_addr + " " + pin + " join" if "OK" in self.global_request(cmd): if timeout == 0: self.dump_monitor() return None ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) if ev is None: raise Exception("Joining the group timed out") self.dump_monitor() return self.group_form_result(ev) raise Exception("P2P_CONNECT(join) failed") def tdls_setup(self, peer): cmd = "TDLS_SETUP " + peer if "FAIL" in self.group_request(cmd): raise Exception("Failed to request TDLS setup") return None def tdls_teardown(self, peer): cmd = "TDLS_TEARDOWN " + peer if "FAIL" in self.group_request(cmd): raise Exception("Failed to request TDLS teardown") return None def connect(self, ssid, psk=None, proto=None, key_mgmt=None, wep_key0=None, ieee80211w=None, pairwise=None, group=None, scan_freq=None, eap=None, identity=None, anonymous_identity=None, password=None, phase1=None, phase2=None, ca_cert=None, domain_suffix_match=None, password_hex=None, client_cert=None, private_key=None, peerkey=False, okc=False, wait_connect=True, only_add_network=False): logger.info("Connect STA " + self.ifname + " to AP") id = self.add_network() self.set_network_quoted(id, "ssid", ssid) if psk: self.set_network_quoted(id, "psk", psk) if proto: self.set_network(id, "proto", proto) if key_mgmt: self.set_network(id, "key_mgmt", key_mgmt) if ieee80211w: self.set_network(id, "ieee80211w", ieee80211w) if pairwise: self.set_network(id, "pairwise", pairwise) if group: self.set_network(id, "group", group) if wep_key0: self.set_network(id, "wep_key0", wep_key0) if scan_freq: self.set_network(id, "scan_freq", scan_freq) if eap: self.set_network(id, "eap", eap) if identity: self.set_network_quoted(id, "identity", identity) if anonymous_identity: self.set_network_quoted(id, "anonymous_identity", anonymous_identity) if password: self.set_network_quoted(id, "password", password) if password_hex: self.set_network(id, "password", password_hex) if ca_cert: self.set_network_quoted(id, "ca_cert", ca_cert) if client_cert: self.set_network_quoted(id, "client_cert", client_cert) if private_key: self.set_network_quoted(id, "private_key", private_key) if phase1: self.set_network_quoted(id, "phase1", phase1) if phase2: self.set_network_quoted(id, "phase2", phase2) if domain_suffix_match: self.set_network_quoted(id, "domain_suffix_match", domain_suffix_match) if peerkey: self.set_network(id, "peerkey", "1") if okc: self.set_network(id, "proactive_key_caching", "1") if only_add_network: return id if wait_connect: self.connect_network(id) else: self.dump_monitor() self.select_network(id) return id def scan(self, type=None, freq=None): if type: cmd = "SCAN TYPE=" + type else: cmd = "SCAN" if freq: cmd = cmd + " freq=" + freq self.dump_monitor() if not "OK" in self.request(cmd): raise Exception("Failed to trigger scan") ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) if ev is None: raise Exception("Scan timed out") def roam(self, bssid): self.dump_monitor() self.request("ROAM " + bssid) ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("Roaming with the AP timed out") self.dump_monitor() def roam_over_ds(self, bssid): self.dump_monitor() self.request("FT_DS " + bssid) ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("Roaming with the AP timed out") self.dump_monitor() def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None, new_passphrase=None, no_wait=False): self.dump_monitor() if new_ssid: self.request("WPS_REG " + bssid + " " + pin + " " + new_ssid.encode("hex") + " " + key_mgmt + " " + cipher + " " + new_passphrase.encode("hex")) if no_wait: return ev = self.wait_event(["WPS-SUCCESS"], timeout=15) else: self.request("WPS_REG " + bssid + " " + pin) if no_wait: return ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15) if ev is None: raise Exception("WPS cred timed out") ev = self.wait_event(["WPS-FAIL"], timeout=15) if ev is None: raise Exception("WPS timed out") ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=15) if ev is None: raise Exception("Association with the AP timed out") def relog(self): self.request("RELOG") def wait_completed(self, timeout=10): for i in range(0, timeout * 2): if self.get_status_field("wpa_state") == "COMPLETED": return time.sleep(0.5) raise Exception("Timeout while waiting for COMPLETED state") def get_capability(self, field): res = self.request("GET_CAPABILITY " + field) if "FAIL" in res: return None return res.split(' ') def get_bss(self, bssid): res = self.request("BSS " + bssid) lines = res.splitlines() vals = dict() for l in lines: [name,value] = l.split('=', 1) vals[name] = value return vals def get_pmksa(self, bssid): res = self.request("PMKSA") lines = res.splitlines() for l in lines: if bssid not in l: continue vals = dict() [index,aa,pmkid,expiration,opportunistic] = l.split(' ') vals['index'] = index vals['pmkid'] = pmkid vals['expiration'] = expiration vals['opportunistic'] = opportunistic return vals return None