tests: Out-of-memory cases for D-Bus operations
This increases testing coverage on various error paths. Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
795b6f57a8
commit
0e126c6dca
2 changed files with 602 additions and 2 deletions
|
@ -19,7 +19,7 @@ except ImportError:
|
|||
|
||||
import hostapd
|
||||
from wpasupplicant import WpaSupplicant
|
||||
from utils import HwsimSkip
|
||||
from utils import HwsimSkip, alloc_fail
|
||||
from test_ap_tdls import connect_2sta_open
|
||||
|
||||
WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
|
||||
|
@ -70,6 +70,27 @@ class TestDbus(object):
|
|||
self.loop.quit()
|
||||
return False
|
||||
|
||||
class alloc_fail_dbus(object):
|
||||
def __init__(self, dev, count, funcs, operation="Operation",
|
||||
expected="NoMemory"):
|
||||
self._dev = dev
|
||||
self._count = count
|
||||
self._funcs = funcs
|
||||
self._operation = operation
|
||||
self._expected = expected
|
||||
def __enter__(self):
|
||||
cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
|
||||
if "OK" not in self._dev.request(cmd):
|
||||
raise HwsimSkip("TEST_ALLOC_FAIL not supported")
|
||||
def __exit__(self, type, value, traceback):
|
||||
if type is None:
|
||||
raise Exception("%s succeeded during out-of-memory" % self._operation)
|
||||
if type == dbus.exceptions.DBusException and self._expected in str(value):
|
||||
return True
|
||||
if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
|
||||
raise Exception("%s did not trigger allocation failure" % self._operation)
|
||||
return False
|
||||
|
||||
def start_ap(ap):
|
||||
ssid = "test-wps"
|
||||
params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
|
||||
|
@ -424,6 +445,58 @@ def test_dbus_wps_invalid(dev, apdev):
|
|||
if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
|
||||
raise Exception("Unexpected error message: " + str(e))
|
||||
|
||||
def test_dbus_wps_oom(dev, apdev):
|
||||
"""D-Bus WPS operation (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
|
||||
if_obj.Get(WPAS_DBUS_IFACE, "State",
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
|
||||
bssid = apdev[0]['bssid']
|
||||
dev[0].scan_for_bss(bssid, freq=2412)
|
||||
|
||||
for i in range(1, 3):
|
||||
with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
|
||||
if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
|
||||
bss_obj.Get(WPAS_DBUS_BSS, "Rates",
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
id = dev[0].add_network()
|
||||
dev[0].set_network(id, "disabled", "0")
|
||||
dev[0].set_network_quoted(id, "ssid", "test")
|
||||
|
||||
for i in range(1, 3):
|
||||
with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
|
||||
if_obj.Get(WPAS_DBUS_IFACE, "Networks",
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
|
||||
dbus_get(dbus, wpas_obj, "Interfaces")
|
||||
|
||||
for i in range(1, 6):
|
||||
with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
|
||||
dbus_get(dbus, wpas_obj, "EapMethods")
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
|
||||
expected="Error.Failed: Failed to set property"):
|
||||
val2 = "push_button display"
|
||||
if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
|
||||
"WPS.Start",
|
||||
expected="UnknownError: WPS start failed"):
|
||||
wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
|
||||
|
||||
def test_dbus_wps_pbc(dev, apdev):
|
||||
"""D-Bus WPS/PBC operation and signals"""
|
||||
try:
|
||||
|
@ -777,6 +850,38 @@ def test_dbus_scan_invalid(dev, apdev):
|
|||
if err not in str(e):
|
||||
raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
|
||||
|
||||
def test_dbus_scan_oom(dev, apdev):
|
||||
"""D-Bus scan method and OOM"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"wpa_scan_clone_params;wpas_dbus_handler_scan",
|
||||
"Scan", expected="ScanError: Scan request rejected"):
|
||||
iface.Scan({ 'Type': 'passive',
|
||||
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
|
||||
"Scan"):
|
||||
iface.Scan({ 'Type': 'passive',
|
||||
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
|
||||
"Scan"):
|
||||
iface.Scan({ 'Type': 'active',
|
||||
'IEs': [ dbus.ByteArray("\xdd\x00") ],
|
||||
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
|
||||
"Scan"):
|
||||
iface.Scan({ 'Type': 'active',
|
||||
'SSIDs': [ dbus.ByteArray("open"),
|
||||
dbus.ByteArray() ],
|
||||
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
|
||||
|
||||
def test_dbus_scan(dev, apdev):
|
||||
"""D-Bus scan and related signals"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -970,6 +1075,136 @@ def test_dbus_connect(dev, apdev):
|
|||
if not t.success():
|
||||
raise Exception("Expected signals not seen")
|
||||
|
||||
def test_dbus_connect_oom(dev, apdev):
|
||||
"""D-Bus AddNetwork and connect when out-of-memory"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
|
||||
raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
|
||||
|
||||
ssid = "test-wpa2-psk"
|
||||
passphrase = 'qwertyuiop'
|
||||
params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
|
||||
hapd = hostapd.add_ap(apdev[0]['ifname'], params)
|
||||
|
||||
class TestDbusConnect(TestDbus):
|
||||
def __init__(self, bus):
|
||||
TestDbus.__init__(self, bus)
|
||||
self.network_added = False
|
||||
self.network_selected = False
|
||||
self.network_removed = False
|
||||
self.state = 0
|
||||
|
||||
def __enter__(self):
|
||||
gobject.timeout_add(1, self.run_connect)
|
||||
gobject.timeout_add(1500, self.timeout)
|
||||
self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
|
||||
self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
|
||||
"NetworkRemoved")
|
||||
self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
|
||||
"NetworkSelected")
|
||||
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
|
||||
"PropertiesChanged")
|
||||
self.loop.run()
|
||||
return self
|
||||
|
||||
def networkAdded(self, network, properties):
|
||||
logger.debug("networkAdded: %s" % str(network))
|
||||
logger.debug(str(properties))
|
||||
self.network_added = True
|
||||
|
||||
def networkRemoved(self, network):
|
||||
logger.debug("networkRemoved: %s" % str(network))
|
||||
self.network_removed = True
|
||||
|
||||
def networkSelected(self, network):
|
||||
logger.debug("networkSelected: %s" % str(network))
|
||||
self.network_selected = True
|
||||
|
||||
def propertiesChanged(self, properties):
|
||||
logger.debug("propertiesChanged: %s" % str(properties))
|
||||
if 'State' in properties and properties['State'] == "completed":
|
||||
if self.state == 0:
|
||||
self.state = 1
|
||||
iface.Disconnect()
|
||||
elif self.state == 2:
|
||||
self.state = 3
|
||||
iface.Disconnect()
|
||||
elif self.state == 4:
|
||||
self.state = 5
|
||||
iface.Reattach()
|
||||
elif self.state == 5:
|
||||
self.state = 6
|
||||
res = iface.SignalPoll()
|
||||
logger.debug("SignalPoll: " + str(res))
|
||||
if 'frequency' not in res or res['frequency'] != 2412:
|
||||
self.state = -1
|
||||
logger.info("Unexpected SignalPoll result")
|
||||
iface.RemoveNetwork(self.netw)
|
||||
if 'State' in properties and properties['State'] == "disconnected":
|
||||
if self.state == 1:
|
||||
self.state = 2
|
||||
iface.SelectNetwork(self.netw)
|
||||
elif self.state == 3:
|
||||
self.state = 4
|
||||
iface.Reassociate()
|
||||
elif self.state == 6:
|
||||
self.state = 7
|
||||
self.loop.quit()
|
||||
|
||||
def run_connect(self, *args):
|
||||
logger.debug("run_connect")
|
||||
args = dbus.Dictionary({ 'ssid': ssid,
|
||||
'key_mgmt': 'WPA-PSK',
|
||||
'psk': passphrase,
|
||||
'scan_freq': 2412 },
|
||||
signature='sv')
|
||||
try:
|
||||
self.netw = iface.AddNetwork(args)
|
||||
except Exception, e:
|
||||
logger.info("Exception on AddNetwork: " + str(e))
|
||||
self.loop.quit()
|
||||
return False
|
||||
try:
|
||||
iface.SelectNetwork(self.netw)
|
||||
except Exception, e:
|
||||
logger.info("Exception on SelectNetwork: " + str(e))
|
||||
self.loop.quit()
|
||||
|
||||
return False
|
||||
|
||||
def success(self):
|
||||
if not self.network_added or \
|
||||
not self.network_removed or \
|
||||
not self.network_selected:
|
||||
return False
|
||||
return self.state == 7
|
||||
|
||||
count = 0
|
||||
for i in range(1, 1000):
|
||||
for j in range(3):
|
||||
dev[j].dump_monitor()
|
||||
dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
|
||||
try:
|
||||
with TestDbusConnect(bus) as t:
|
||||
if not t.success():
|
||||
logger.info("Iteration %d - Expected signals not seen" % i)
|
||||
else:
|
||||
logger.info("Iteration %d - success" % i)
|
||||
|
||||
state = dev[0].request('GET_ALLOC_FAIL')
|
||||
logger.info("GET_ALLOC_FAIL: " + state)
|
||||
dev[0].dump_monitor()
|
||||
dev[0].request("TEST_ALLOC_FAIL 0:")
|
||||
if i < 3:
|
||||
raise Exception("Connection succeeded during out-of-memory")
|
||||
if not state.startswith('0:'):
|
||||
count += 1
|
||||
if count == 5:
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_dbus_while_not_connected(dev, apdev):
|
||||
"""D-Bus invalid operations while not connected"""
|
||||
|
@ -1169,6 +1404,86 @@ def test_dbus_network(dev, apdev):
|
|||
if "InvalidArgs" not in str(e):
|
||||
raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
|
||||
|
||||
def test_dbus_network_oom(dev, apdev):
|
||||
"""D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
|
||||
'identity': "testuser", 'scan_freq': '2412' },
|
||||
signature='sv')
|
||||
netw1 = iface.AddNetwork(args)
|
||||
net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"wpa_config_get_all;wpas_dbus_getter_network_properties",
|
||||
"Get"):
|
||||
net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||
|
||||
iface.RemoveAllNetworks()
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
|
||||
"RemoveNetwork", "InvalidArgs"):
|
||||
iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
|
||||
|
||||
with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
|
||||
args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
|
||||
signature='sv')
|
||||
try:
|
||||
netw = iface.AddNetwork(args)
|
||||
# Currently, AddNetwork() succeeds even if os_strdup() for path
|
||||
# fails, so remove the network if that occurs.
|
||||
iface.RemoveNetwork(netw)
|
||||
except dbus.exceptions.DBusException, e:
|
||||
pass
|
||||
|
||||
for i in range(1, 3):
|
||||
with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
|
||||
try:
|
||||
netw = iface.AddNetwork(args)
|
||||
# Currently, AddNetwork() succeeds even if network registration
|
||||
# fails, so remove the network if that occurs.
|
||||
iface.RemoveNetwork(netw)
|
||||
except dbus.exceptions.DBusException, e:
|
||||
pass
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpa_config_add_network;wpas_dbus_handler_add_network",
|
||||
"AddNetwork",
|
||||
"UnknownError: wpa_supplicant could not add a network"):
|
||||
args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
|
||||
signature='sv')
|
||||
netw = iface.AddNetwork(args)
|
||||
|
||||
tests = [ (1,
|
||||
'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
|
||||
signature='sv')),
|
||||
(1, '=set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
|
||||
(1, '=set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
|
||||
(1, '=set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'priority': dbus.UInt32(1) },
|
||||
signature='sv')),
|
||||
(1, '=set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'priority': dbus.Int32(1) },
|
||||
signature='sv')),
|
||||
(1, '=set_network_properties;wpas_dbus_handler_add_network',
|
||||
dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
|
||||
signature='sv')) ]
|
||||
for (count,funcs,args) in tests:
|
||||
with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
|
||||
netw = iface.AddNetwork(args)
|
||||
|
||||
if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
|
||||
dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
|
||||
raise Exception("Unexpected network block added")
|
||||
if len(dev[0].list_networks()) > 0:
|
||||
raise Exception("Unexpected network block visible")
|
||||
|
||||
def test_dbus_interface(dev, apdev):
|
||||
"""D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1227,6 +1542,41 @@ def test_dbus_interface(dev, apdev):
|
|||
if "InterfaceUnknown" not in str(e):
|
||||
raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
|
||||
|
||||
def test_dbus_interface_oom(dev, apdev):
|
||||
"""D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
|
||||
params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
|
||||
signature='sv')
|
||||
wpas.CreateInterface(params)
|
||||
|
||||
for i in range(1, 1000):
|
||||
dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
|
||||
params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
|
||||
signature='sv')
|
||||
try:
|
||||
npath = wpas.CreateInterface(params)
|
||||
wpas.RemoveInterface(npath)
|
||||
logger.info("CreateInterface succeeds after %d allocation failures" % i)
|
||||
state = dev[0].request('GET_ALLOC_FAIL')
|
||||
logger.info("GET_ALLOC_FAIL: " + state)
|
||||
dev[0].dump_monitor()
|
||||
dev[0].request("TEST_ALLOC_FAIL 0:")
|
||||
if i < 5:
|
||||
raise Exception("CreateInterface succeeded during out-of-memory")
|
||||
if not state.startswith('0:'):
|
||||
break
|
||||
except dbus.exceptions.DBusException, e:
|
||||
pass
|
||||
|
||||
for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
|
||||
"CreateInterface"):
|
||||
params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
|
||||
wpas.CreateInterface(params)
|
||||
|
||||
def test_dbus_blob(dev, apdev):
|
||||
"""D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1302,6 +1652,16 @@ def test_dbus_blob(dev, apdev):
|
|||
if not t.success():
|
||||
raise Exception("Expected signals not seen")
|
||||
|
||||
def test_dbus_blob_oom(dev, apdev):
|
||||
"""D-Bus AddNetwork/RemoveNetwork OOM error cases"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
for i in range(1, 4):
|
||||
with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
|
||||
"AddBlob"):
|
||||
iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
|
||||
|
||||
def test_dbus_autoscan(dev, apdev):
|
||||
"""D-Bus Autoscan()"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1312,6 +1672,15 @@ def test_dbus_autoscan(dev, apdev):
|
|||
iface.AutoScan("")
|
||||
dev[0].request("AUTOSCAN ")
|
||||
|
||||
def test_dbus_autoscan_oom(dev, apdev):
|
||||
"""D-Bus Autoscan() OOM"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
|
||||
iface.AutoScan("foo")
|
||||
dev[0].request("AUTOSCAN ")
|
||||
|
||||
def test_dbus_tdls_invalid(dev, apdev):
|
||||
"""D-Bus invalid TDLS operations"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1360,6 +1729,15 @@ def test_dbus_tdls_invalid(dev, apdev):
|
|||
if "UnknownError: error performing TDLS teardown" not in str(e):
|
||||
raise Exception("Unexpected error message: " + str(e))
|
||||
|
||||
def test_dbus_tdls_oom(dev, apdev):
|
||||
"""D-Bus TDLS operations during OOM"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
|
||||
"UnknownError: error performing TDLS setup"):
|
||||
iface.TDLSSetup("00:11:22:33:44:55")
|
||||
|
||||
def test_dbus_tdls(dev, apdev):
|
||||
"""D-Bus TDLS"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1734,6 +2112,15 @@ def test_dbus_probe_req_reporting(dev, apdev):
|
|||
dev[1].p2p_stop_find()
|
||||
dev[0].remove_group()
|
||||
|
||||
def test_dbus_probe_req_reporting_oom(dev, apdev):
|
||||
"""D-Bus Probe Request reporting (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
|
||||
"SubscribeProbeReq"):
|
||||
iface.SubscribeProbeReq()
|
||||
|
||||
def test_dbus_p2p_invalid(dev, apdev):
|
||||
"""D-Bus invalid P2P operations"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -1943,6 +2330,63 @@ def test_dbus_p2p_invalid(dev, apdev):
|
|||
finally:
|
||||
dev[0].request("P2P_SET disabled 0")
|
||||
|
||||
def test_dbus_p2p_oom(dev, apdev):
|
||||
"""D-Bus P2P operations and OOM"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123'),
|
||||
dbus.ByteArray('123') ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
|
||||
"Find", "InvalidArgs"):
|
||||
p2p.Find(dbus.Dictionary({ 'Foo': path }))
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
|
||||
"AddService", "InvalidArgs"):
|
||||
args = { 'service_type': 'bonjour',
|
||||
'response': dbus.ByteArray(500*'b') }
|
||||
p2p.AddService(args)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
|
||||
"AddService", "InvalidArgs"):
|
||||
p2p.AddService(args)
|
||||
|
||||
def test_dbus_p2p_discovery(dev, apdev):
|
||||
"""D-Bus P2P discovery"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -3653,3 +4097,47 @@ def test_dbus_p2p_two_groups(dev, apdev):
|
|||
raise Exception("Expected signals not seen")
|
||||
|
||||
dev[1].remove_group()
|
||||
|
||||
def test_dbus_introspect(dev, apdev):
|
||||
"""D-Bus introspection"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
||||
res = if_obj.Introspect(WPAS_DBUS_IFACE,
|
||||
dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
||||
logger.info("Initial Introspect: " + str(res))
|
||||
if res is None or "Introspectable" not in res or "GroupStarted" not in res:
|
||||
raise Exception("Unexpected initial Introspect response: " + str(res))
|
||||
|
||||
with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
|
||||
res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
|
||||
dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
||||
logger.info("Introspect: " + str(res2))
|
||||
if res2 is not None:
|
||||
raise Exception("Unexpected Introspect response")
|
||||
|
||||
with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
|
||||
res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
|
||||
dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
||||
logger.info("Introspect: " + str(res2))
|
||||
if res2 is None:
|
||||
raise Exception("No Introspect response")
|
||||
if len(res2) >= len(res):
|
||||
raise Exception("Unexpected Introspect response")
|
||||
|
||||
with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
|
||||
res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
|
||||
dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
||||
logger.info("Introspect: " + str(res2))
|
||||
if res2 is None:
|
||||
raise Exception("No Introspect response")
|
||||
if len(res2) >= len(res):
|
||||
raise Exception("Unexpected Introspect response")
|
||||
|
||||
with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
|
||||
res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
|
||||
dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
||||
logger.info("Introspect: " + str(res2))
|
||||
if res2 is None:
|
||||
raise Exception("No Introspect response")
|
||||
if len(res2) >= len(res):
|
||||
raise Exception("Unexpected Introspect response")
|
||||
|
|
|
@ -16,7 +16,7 @@ except ImportError:
|
|||
|
||||
import hostapd
|
||||
from utils import HwsimSkip
|
||||
from test_dbus import TestDbus, start_ap
|
||||
from test_dbus import TestDbus, alloc_fail_dbus, start_ap
|
||||
|
||||
WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
|
||||
WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
|
||||
|
@ -237,6 +237,24 @@ def test_dbus_old_smartcard(dev, apdev):
|
|||
if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
|
||||
raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
|
||||
|
||||
def test_dbus_old_smartcard_oom(dev, apdev):
|
||||
"""The old D-Bus interface - smartcard (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
||||
for arg in [ 'opensc_engine_path', 'pkcs11_engine_path', 'pkcs11_module_path' ]:
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpas_dbus_iface_set_smartcard_modules",
|
||||
"setSmartcardModules",
|
||||
"InvalidOptions"):
|
||||
params = dbus.Dictionary({ arg : "foo", }, signature='sv')
|
||||
if_obj.setSmartcardModules(params,
|
||||
dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_smartcard_modules",
|
||||
"setSmartcardModules", "InvalidOptions"):
|
||||
params = dbus.Dictionary({ arg : "foo", }, signature='sv')
|
||||
if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
def test_dbus_old_interface(dev, apdev):
|
||||
"""The old D-Bus interface - interface get/add/remove"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -295,6 +313,22 @@ def test_dbus_old_interface(dev, apdev):
|
|||
if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
|
||||
raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
|
||||
|
||||
def test_dbus_old_interface_oom(dev, apdev):
|
||||
"""The old D-Bus interface - interface get/add/remove (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_global_add_interface",
|
||||
"addInterface", "InvalidOptions"):
|
||||
params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
|
||||
wpas.addInterface("lo", params)
|
||||
|
||||
for arg in [ "driver", "driver-params", "config-file", "bridge-ifname" ]:
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_global_add_interface",
|
||||
"addInterface", "InvalidOptions"):
|
||||
params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
|
||||
wpas.addInterface("lo", params)
|
||||
|
||||
def test_dbus_old_blob(dev, apdev):
|
||||
"""The old D-Bus interface - blob operations"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -335,6 +369,31 @@ def test_dbus_old_blob(dev, apdev):
|
|||
if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
def test_dbus_old_blob_oom(dev, apdev):
|
||||
"""The old D-Bus interface - blob operations (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
||||
blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
|
||||
'blob2': dbus.ByteArray([ 1, 2 ]) },
|
||||
signature='sv')
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_blobs", "setBlobs",
|
||||
"AddError: Not enough memory to add blob"):
|
||||
if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 2, "=wpas_dbus_iface_set_blobs", "setBlobs",
|
||||
"AddError: Not enough memory to add blob data"):
|
||||
if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 3, "=wpas_dbus_iface_set_blobs", "setBlobs",
|
||||
"AddError: Error adding blob"):
|
||||
if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_decompose_object_path;wpas_iface_message_handler",
|
||||
"setBlobs",
|
||||
"InvalidInterface: wpa_supplicant knows nothing about this interface"):
|
||||
if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
def test_dbus_old_connect(dev, apdev):
|
||||
"""The old D-Bus interface - add a network and connect"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
@ -725,3 +784,56 @@ def _test_dbus_old_wps_reg(dev, apdev):
|
|||
raise Exception("Expected signals not seen")
|
||||
|
||||
dev[0].wait_connected(timeout=10)
|
||||
|
||||
def test_dbus_old_wps_oom(dev, apdev):
|
||||
"""The old D-Bus interface and WPS (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
bssid = apdev[0]['bssid']
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpa_config_add_network;wpas_dbus_iface_wps_pbc",
|
||||
"wpsPbc",
|
||||
"WpsPbcError: Could not start PBC negotiation"):
|
||||
if_obj.wpsPbc("any", dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpa_config_add_network;wpas_dbus_iface_wps_pin",
|
||||
"wpsPin", "WpsPinError: Could not init PIN"):
|
||||
if_obj.wpsPin("any", "", dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpa_config_add_network;wpas_dbus_iface_wps_reg",
|
||||
"wpsReg",
|
||||
"WpsRegError: Could not request credentials"):
|
||||
if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
def test_dbus_old_network_set_oom(dev, apdev):
|
||||
"""The old D-Bus interface and network set method (OOM)"""
|
||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"=wpa_config_add_network;wpas_dbus_iface_add_network",
|
||||
"addNetwork",
|
||||
"AddNetworkError: wpa_supplicant could not add"):
|
||||
if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
|
||||
path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
|
||||
netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
|
||||
netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
|
||||
|
||||
with alloc_fail_dbus(dev[0], 1,
|
||||
"_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_network",
|
||||
"set", "InvalidOptions"):
|
||||
params = dbus.Dictionary({ 'ssid': "foo" }, signature='sv')
|
||||
netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
|
||||
|
||||
tests = [ { 'identity': dbus.ByteArray([ 1, 2 ]) },
|
||||
{ 'scan_freq': dbus.UInt32(2412) },
|
||||
{ 'priority': dbus.Int32(0) },
|
||||
{ 'identity': "user" },
|
||||
{ 'eap': "TLS" }]
|
||||
for arg in tests:
|
||||
with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_network",
|
||||
"set", "InvalidOptions"):
|
||||
params = dbus.Dictionary(arg, signature='sv')
|
||||
netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
|
||||
|
|
Loading…
Reference in a new issue