tests: Add RemoteCtrl class
Signed-off-by: Janusz Dziedzic <janusz.dziedzic@gmail.com>
This commit is contained in:
parent
d42cfaa397
commit
d3f340c869
1 changed files with 92 additions and 0 deletions
92
tests/hwsim/remotectrl.py
Normal file
92
tests/hwsim/remotectrl.py
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
#
|
||||||
|
# wpa_supplicant/hostapd control interface using Python
|
||||||
|
# Copyright (c) 2024, Jouni Malinen <j@w1.fi>
|
||||||
|
#
|
||||||
|
# This software may be distributed under the terms of the BSD license.
|
||||||
|
# See README for more details.
|
||||||
|
|
||||||
|
from wpaspy import Ctrl
|
||||||
|
import remotehost
|
||||||
|
|
||||||
|
class RemoteCtrl(Ctrl):
|
||||||
|
def __init__(self, path, port=9877, hostname=None, ifname=None):
|
||||||
|
self.started = False
|
||||||
|
self.attached = False
|
||||||
|
self.path = path
|
||||||
|
self.port = port
|
||||||
|
self.ifname = ifname
|
||||||
|
self.hostname = hostname
|
||||||
|
self.proc = None
|
||||||
|
|
||||||
|
self.host = remotehost.Host(hostname)
|
||||||
|
self.started = True
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.attached:
|
||||||
|
try:
|
||||||
|
self.detach()
|
||||||
|
except Exception as e:
|
||||||
|
# Need to ignore this allow the socket to be closed
|
||||||
|
self.attached = False
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self.host and self.started:
|
||||||
|
self.started = False
|
||||||
|
|
||||||
|
def request(self, cmd, timeout=10):
|
||||||
|
if self.host:
|
||||||
|
cmd = '\'' + cmd + '\''
|
||||||
|
if self.ifname:
|
||||||
|
_cmd = ['wpa_cli', '-p', self.path, '-i', self.ifname, "raw " + cmd]
|
||||||
|
else:
|
||||||
|
_cmd = ['wpa_cli', '-g', self.path, "raw " + cmd]
|
||||||
|
status, buf = self.host.execute(_cmd)
|
||||||
|
return buf
|
||||||
|
|
||||||
|
def attach(self):
|
||||||
|
if self.attached:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.host:
|
||||||
|
if self.ifname:
|
||||||
|
_cmd = [ "wpa_cli", "-p", self.path, "-i", self.ifname ]
|
||||||
|
else:
|
||||||
|
_cmd = [ "wpa_cli", '-g', self.path]
|
||||||
|
self.proc = self.host.proc_run(_cmd)
|
||||||
|
self.attached = True
|
||||||
|
|
||||||
|
def detach(self):
|
||||||
|
if not self.attached:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.hostname and self.proc:
|
||||||
|
self.request("DETACH")
|
||||||
|
self.request("QUIT")
|
||||||
|
self.host.proc_stop(self.proc)
|
||||||
|
self.attached = False
|
||||||
|
self.proc = None
|
||||||
|
|
||||||
|
def terminate(self):
|
||||||
|
if self.attached:
|
||||||
|
try:
|
||||||
|
self.detach()
|
||||||
|
except Exception as e:
|
||||||
|
# Need to ignore this to allow the socket to be closed
|
||||||
|
self.attached = False
|
||||||
|
self.request("TERMINATE")
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def pending(self, timeout=0):
|
||||||
|
if self.host and self.proc:
|
||||||
|
return self.host.proc_pending(self.proc, timeout=timeout)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def recv(self):
|
||||||
|
if self.host and self.proc:
|
||||||
|
res = self.host.proc_read(self.proc)
|
||||||
|
return res
|
||||||
|
return ""
|
Loading…
Reference in a new issue