hostapd/wpaspy/wpaspy.py
Jouni Malinen 19318861a5 wpaspy: Handle DETACH response more robustly
There could be pending unsolicited event messages on the monitor socket
when the DETACH command is issued. As such, the response may be
something else then OK even if the actual detach operation succeeded.
Try to avoid this be dropping pending messages before issuing the detach
command. As an additional workaround, check the response against FAIL
instead of requiring OK so that the self.attached does not get left to
True incorrectly even if an additional event message were to be
received.

Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
2014-04-28 16:54:09 +03:00

84 lines
2.2 KiB
Python

#!/usr/bin/python
#
# wpa_supplicant/hostapd control interface using Python
# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import os
import socket
import select
counter = 0
class Ctrl:
def __init__(self, path):
global counter
self.started = False
self.attached = False
self.s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.dest = path
self.local = "/tmp/wpa_ctrl_" + str(os.getpid()) + '-' + str(counter)
counter += 1
self.s.bind(self.local)
try:
self.s.connect(self.dest)
except Exception, e:
self.s.close()
os.unlink(self.local)
raise
self.started = True
def __del__(self):
self.close()
def close(self):
if self.attached:
try:
self.detach()
except Exception, e:
# Need to ignore this allow the socket to be closed
self.attached = False
pass
if self.started:
self.s.close()
os.unlink(self.local)
self.started = False
def request(self, cmd, timeout=10):
self.s.send(cmd)
[r, w, e] = select.select([self.s], [], [], timeout)
if r:
return self.s.recv(4096)
raise Exception("Timeout on waiting response")
def attach(self):
if self.attached:
return None
res = self.request("ATTACH")
if "OK" in res:
self.attached = True
return None
raise Exception("ATTACH failed")
def detach(self):
if not self.attached:
return None
while self.pending():
ev = self.recv()
res = self.request("DETACH")
if "FAIL" not in res:
self.attached = False
return None
raise Exception("DETACH failed")
def pending(self, timeout=0):
[r, w, e] = select.select([self.s], [], [], timeout)
if r:
return True
return False
def recv(self):
res = self.s.recv(4096)
return res