From 0e126c6dca0427dd792d6be5996fa3de2e529935 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Tue, 6 Jan 2015 13:33:32 +0200 Subject: [PATCH] tests: Out-of-memory cases for D-Bus operations This increases testing coverage on various error paths. Signed-off-by: Jouni Malinen --- tests/hwsim/test_dbus.py | 490 ++++++++++++++++++++++++++++++++++- tests/hwsim/test_dbus_old.py | 114 +++++++- 2 files changed, 602 insertions(+), 2 deletions(-) diff --git a/tests/hwsim/test_dbus.py b/tests/hwsim/test_dbus.py index 6430722ed..142ef9ff8 100644 --- a/tests/hwsim/test_dbus.py +++ b/tests/hwsim/test_dbus.py @@ -19,7 +19,7 @@ except ImportError: import hostapd from wpasupplicant import WpaSupplicant -from utils import HwsimSkip +from utils import HwsimSkip, alloc_fail from test_ap_tdls import connect_2sta_open WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1" @@ -70,6 +70,27 @@ class TestDbus(object): self.loop.quit() return False +class alloc_fail_dbus(object): + def __init__(self, dev, count, funcs, operation="Operation", + expected="NoMemory"): + self._dev = dev + self._count = count + self._funcs = funcs + self._operation = operation + self._expected = expected + def __enter__(self): + cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) + if "OK" not in self._dev.request(cmd): + raise HwsimSkip("TEST_ALLOC_FAIL not supported") + def __exit__(self, type, value, traceback): + if type is None: + raise Exception("%s succeeded during out-of-memory" % self._operation) + if type == dbus.exceptions.DBusException and self._expected in str(value): + return True + if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: + raise Exception("%s did not trigger allocation failure" % self._operation) + return False + def start_ap(ap): ssid = "test-wps" params = { "ssid": ssid, "eap_server": "1", "wps_state": "2", @@ -424,6 +445,58 @@ def test_dbus_wps_invalid(dev, apdev): if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"): raise Exception("Unexpected error message: " + str(e)) +def test_dbus_wps_oom(dev, apdev): + """D-Bus WPS operation (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) + + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"): + if_obj.Get(WPAS_DBUS_IFACE, "State", + dbus_interface=dbus.PROPERTIES_IFACE) + + hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) + bssid = apdev[0]['bssid'] + dev[0].scan_for_bss(bssid, freq=2412) + + for i in range(1, 3): + with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"): + if_obj.Get(WPAS_DBUS_IFACE, "BSSs", + dbus_interface=dbus.PROPERTIES_IFACE) + + res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', + dbus_interface=dbus.PROPERTIES_IFACE) + bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"): + bss_obj.Get(WPAS_DBUS_BSS, "Rates", + dbus_interface=dbus.PROPERTIES_IFACE) + + id = dev[0].add_network() + dev[0].set_network(id, "disabled", "0") + dev[0].set_network_quoted(id, "ssid", "test") + + for i in range(1, 3): + with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"): + if_obj.Get(WPAS_DBUS_IFACE, "Networks", + dbus_interface=dbus.PROPERTIES_IFACE) + + with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"): + dbus_get(dbus, wpas_obj, "Interfaces") + + for i in range(1, 6): + with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"): + dbus_get(dbus, wpas_obj, "EapMethods") + + with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set", + expected="Error.Failed: Failed to set property"): + val2 = "push_button display" + if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, + dbus_interface=dbus.PROPERTIES_IFACE) + + with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start", + "WPS.Start", + expected="UnknownError: WPS start failed"): + wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'}) + def test_dbus_wps_pbc(dev, apdev): """D-Bus WPS/PBC operation and signals""" try: @@ -777,6 +850,38 @@ def test_dbus_scan_invalid(dev, apdev): if err not in str(e): raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e))) +def test_dbus_scan_oom(dev, apdev): + """D-Bus scan method and OOM""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + with alloc_fail_dbus(dev[0], 1, + "wpa_scan_clone_params;wpas_dbus_handler_scan", + "Scan", expected="ScanError: Scan request rejected"): + iface.Scan({ 'Type': 'passive', + 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) + + with alloc_fail_dbus(dev[0], 1, + "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan", + "Scan"): + iface.Scan({ 'Type': 'passive', + 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) + + with alloc_fail_dbus(dev[0], 1, + "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan", + "Scan"): + iface.Scan({ 'Type': 'active', + 'IEs': [ dbus.ByteArray("\xdd\x00") ], + 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) + + with alloc_fail_dbus(dev[0], 1, + "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan", + "Scan"): + iface.Scan({ 'Type': 'active', + 'SSIDs': [ dbus.ByteArray("open"), + dbus.ByteArray() ], + 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) + def test_dbus_scan(dev, apdev): """D-Bus scan and related signals""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -970,6 +1075,136 @@ def test_dbus_connect(dev, apdev): if not t.success(): raise Exception("Expected signals not seen") +def test_dbus_connect_oom(dev, apdev): + """D-Bus AddNetwork and connect when out-of-memory""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"): + raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build") + + ssid = "test-wpa2-psk" + passphrase = 'qwertyuiop' + params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) + hapd = hostapd.add_ap(apdev[0]['ifname'], params) + + class TestDbusConnect(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.network_added = False + self.network_selected = False + self.network_removed = False + self.state = 0 + + def __enter__(self): + gobject.timeout_add(1, self.run_connect) + gobject.timeout_add(1500, self.timeout) + self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") + self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, + "NetworkRemoved") + self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, + "NetworkSelected") + self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, + "PropertiesChanged") + self.loop.run() + return self + + def networkAdded(self, network, properties): + logger.debug("networkAdded: %s" % str(network)) + logger.debug(str(properties)) + self.network_added = True + + def networkRemoved(self, network): + logger.debug("networkRemoved: %s" % str(network)) + self.network_removed = True + + def networkSelected(self, network): + logger.debug("networkSelected: %s" % str(network)) + self.network_selected = True + + def propertiesChanged(self, properties): + logger.debug("propertiesChanged: %s" % str(properties)) + if 'State' in properties and properties['State'] == "completed": + if self.state == 0: + self.state = 1 + iface.Disconnect() + elif self.state == 2: + self.state = 3 + iface.Disconnect() + elif self.state == 4: + self.state = 5 + iface.Reattach() + elif self.state == 5: + self.state = 6 + res = iface.SignalPoll() + logger.debug("SignalPoll: " + str(res)) + if 'frequency' not in res or res['frequency'] != 2412: + self.state = -1 + logger.info("Unexpected SignalPoll result") + iface.RemoveNetwork(self.netw) + if 'State' in properties and properties['State'] == "disconnected": + if self.state == 1: + self.state = 2 + iface.SelectNetwork(self.netw) + elif self.state == 3: + self.state = 4 + iface.Reassociate() + elif self.state == 6: + self.state = 7 + self.loop.quit() + + def run_connect(self, *args): + logger.debug("run_connect") + args = dbus.Dictionary({ 'ssid': ssid, + 'key_mgmt': 'WPA-PSK', + 'psk': passphrase, + 'scan_freq': 2412 }, + signature='sv') + try: + self.netw = iface.AddNetwork(args) + except Exception, e: + logger.info("Exception on AddNetwork: " + str(e)) + self.loop.quit() + return False + try: + iface.SelectNetwork(self.netw) + except Exception, e: + logger.info("Exception on SelectNetwork: " + str(e)) + self.loop.quit() + + return False + + def success(self): + if not self.network_added or \ + not self.network_removed or \ + not self.network_selected: + return False + return self.state == 7 + + count = 0 + for i in range(1, 1000): + for j in range(3): + dev[j].dump_monitor() + dev[0].request("TEST_ALLOC_FAIL %d:main" % i) + try: + with TestDbusConnect(bus) as t: + if not t.success(): + logger.info("Iteration %d - Expected signals not seen" % i) + else: + logger.info("Iteration %d - success" % i) + + state = dev[0].request('GET_ALLOC_FAIL') + logger.info("GET_ALLOC_FAIL: " + state) + dev[0].dump_monitor() + dev[0].request("TEST_ALLOC_FAIL 0:") + if i < 3: + raise Exception("Connection succeeded during out-of-memory") + if not state.startswith('0:'): + count += 1 + if count == 5: + break + except: + pass def test_dbus_while_not_connected(dev, apdev): """D-Bus invalid operations while not connected""" @@ -1169,6 +1404,86 @@ def test_dbus_network(dev, apdev): if "InvalidArgs" not in str(e): raise Exception("Unexpected error message for invalid AddNetwork: " + str(e)) +def test_dbus_network_oom(dev, apdev): + """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE', + 'identity': "testuser", 'scan_freq': '2412' }, + signature='sv') + netw1 = iface.AddNetwork(args) + net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) + + with alloc_fail_dbus(dev[0], 1, + "wpa_config_get_all;wpas_dbus_getter_network_properties", + "Get"): + net_obj.Get(WPAS_DBUS_NETWORK, "Properties", + dbus_interface=dbus.PROPERTIES_IFACE) + + iface.RemoveAllNetworks() + + with alloc_fail_dbus(dev[0], 1, + "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network", + "RemoveNetwork", "InvalidArgs"): + iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234")) + + with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"): + args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' }, + signature='sv') + try: + netw = iface.AddNetwork(args) + # Currently, AddNetwork() succeeds even if os_strdup() for path + # fails, so remove the network if that occurs. + iface.RemoveNetwork(netw) + except dbus.exceptions.DBusException, e: + pass + + for i in range(1, 3): + with alloc_fail(dev[0], i, "=wpas_dbus_register_network"): + try: + netw = iface.AddNetwork(args) + # Currently, AddNetwork() succeeds even if network registration + # fails, so remove the network if that occurs. + iface.RemoveNetwork(netw) + except dbus.exceptions.DBusException, e: + pass + + with alloc_fail_dbus(dev[0], 1, + "=wpa_config_add_network;wpas_dbus_handler_add_network", + "AddNetwork", + "UnknownError: wpa_supplicant could not add a network"): + args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' }, + signature='sv') + netw = iface.AddNetwork(args) + + tests = [ (1, + 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') }, + signature='sv')), + (1, '=set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')), + (1, '=set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'eap': 'foo' }, signature='sv')), + (1, '=set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'priority': dbus.UInt32(1) }, + signature='sv')), + (1, '=set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'priority': dbus.Int32(1) }, + signature='sv')), + (1, '=set_network_properties;wpas_dbus_handler_add_network', + dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') }, + signature='sv')) ] + for (count,funcs,args) in tests: + with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"): + netw = iface.AddNetwork(args) + + if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks', + dbus_interface=dbus.PROPERTIES_IFACE)) > 0: + raise Exception("Unexpected network block added") + if len(dev[0].list_networks()) > 0: + raise Exception("Unexpected network block visible") + def test_dbus_interface(dev, apdev): """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1227,6 +1542,41 @@ def test_dbus_interface(dev, apdev): if "InterfaceUnknown" not in str(e): raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) +def test_dbus_interface_oom(dev, apdev): + """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) + + with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"): + params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' }, + signature='sv') + wpas.CreateInterface(params) + + for i in range(1, 1000): + dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i) + params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' }, + signature='sv') + try: + npath = wpas.CreateInterface(params) + wpas.RemoveInterface(npath) + logger.info("CreateInterface succeeds after %d allocation failures" % i) + state = dev[0].request('GET_ALLOC_FAIL') + logger.info("GET_ALLOC_FAIL: " + state) + dev[0].dump_monitor() + dev[0].request("TEST_ALLOC_FAIL 0:") + if i < 5: + raise Exception("CreateInterface succeeded during out-of-memory") + if not state.startswith('0:'): + break + except dbus.exceptions.DBusException, e: + pass + + for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]: + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface", + "CreateInterface"): + params = dbus.Dictionary({ arg: 'foo' }, signature='sv') + wpas.CreateInterface(params) + def test_dbus_blob(dev, apdev): """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1302,6 +1652,16 @@ def test_dbus_blob(dev, apdev): if not t.success(): raise Exception("Expected signals not seen") +def test_dbus_blob_oom(dev, apdev): + """D-Bus AddNetwork/RemoveNetwork OOM error cases""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + for i in range(1, 4): + with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob", + "AddBlob"): + iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04")) + def test_dbus_autoscan(dev, apdev): """D-Bus Autoscan()""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1312,6 +1672,15 @@ def test_dbus_autoscan(dev, apdev): iface.AutoScan("") dev[0].request("AUTOSCAN ") +def test_dbus_autoscan_oom(dev, apdev): + """D-Bus Autoscan() OOM""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"): + iface.AutoScan("foo") + dev[0].request("AUTOSCAN ") + def test_dbus_tdls_invalid(dev, apdev): """D-Bus invalid TDLS operations""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1360,6 +1729,15 @@ def test_dbus_tdls_invalid(dev, apdev): if "UnknownError: error performing TDLS teardown" not in str(e): raise Exception("Unexpected error message: " + str(e)) +def test_dbus_tdls_oom(dev, apdev): + """D-Bus TDLS operations during OOM""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup", + "UnknownError: error performing TDLS setup"): + iface.TDLSSetup("00:11:22:33:44:55") + def test_dbus_tdls(dev, apdev): """D-Bus TDLS""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1734,6 +2112,15 @@ def test_dbus_probe_req_reporting(dev, apdev): dev[1].p2p_stop_find() dev[0].remove_group() +def test_dbus_probe_req_reporting_oom(dev, apdev): + """D-Bus Probe Request reporting (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) + + with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq", + "SubscribeProbeReq"): + iface.SubscribeProbeReq() + def test_dbus_p2p_invalid(dev, apdev): """D-Bus invalid P2P operations""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -1943,6 +2330,63 @@ def test_dbus_p2p_invalid(dev, apdev): finally: dev[0].request("P2P_SET disabled 0") +def test_dbus_p2p_oom(dev, apdev): + """D-Bus P2P operations and OOM""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) + + with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] })) + + with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] })) + + with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] })) + + with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + + with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + + with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123'), + dbus.ByteArray('123') ] })) + + with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + + with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find", + "Find", "InvalidArgs"): + p2p.Find(dbus.Dictionary({ 'Foo': path })) + + with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array", + "AddService", "InvalidArgs"): + args = { 'service_type': 'bonjour', + 'response': dbus.ByteArray(500*'b') } + p2p.AddService(args) + + with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array", + "AddService", "InvalidArgs"): + p2p.AddService(args) + def test_dbus_p2p_discovery(dev, apdev): """D-Bus P2P discovery""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -3653,3 +4097,47 @@ def test_dbus_p2p_two_groups(dev, apdev): raise Exception("Expected signals not seen") dev[1].remove_group() + +def test_dbus_introspect(dev, apdev): + """D-Bus introspection""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + + res = if_obj.Introspect(WPAS_DBUS_IFACE, + dbus_interface=dbus.INTROSPECTABLE_IFACE) + logger.info("Initial Introspect: " + str(res)) + if res is None or "Introspectable" not in res or "GroupStarted" not in res: + raise Exception("Unexpected initial Introspect response: " + str(res)) + + with alloc_fail(dev[0], 1, "wpa_dbus_introspect"): + res2 = if_obj.Introspect(WPAS_DBUS_IFACE, + dbus_interface=dbus.INTROSPECTABLE_IFACE) + logger.info("Introspect: " + str(res2)) + if res2 is not None: + raise Exception("Unexpected Introspect response") + + with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"): + res2 = if_obj.Introspect(WPAS_DBUS_IFACE, + dbus_interface=dbus.INTROSPECTABLE_IFACE) + logger.info("Introspect: " + str(res2)) + if res2 is None: + raise Exception("No Introspect response") + if len(res2) >= len(res): + raise Exception("Unexpected Introspect response") + + with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"): + res2 = if_obj.Introspect(WPAS_DBUS_IFACE, + dbus_interface=dbus.INTROSPECTABLE_IFACE) + logger.info("Introspect: " + str(res2)) + if res2 is None: + raise Exception("No Introspect response") + if len(res2) >= len(res): + raise Exception("Unexpected Introspect response") + + with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"): + res2 = if_obj.Introspect(WPAS_DBUS_IFACE, + dbus_interface=dbus.INTROSPECTABLE_IFACE) + logger.info("Introspect: " + str(res2)) + if res2 is None: + raise Exception("No Introspect response") + if len(res2) >= len(res): + raise Exception("Unexpected Introspect response") diff --git a/tests/hwsim/test_dbus_old.py b/tests/hwsim/test_dbus_old.py index bc7cda0d1..e677e78c0 100644 --- a/tests/hwsim/test_dbus_old.py +++ b/tests/hwsim/test_dbus_old.py @@ -16,7 +16,7 @@ except ImportError: import hostapd from utils import HwsimSkip -from test_dbus import TestDbus, start_ap +from test_dbus import TestDbus, alloc_fail_dbus, start_ap WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant" WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant" @@ -237,6 +237,24 @@ def test_dbus_old_smartcard(dev, apdev): if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e))) +def test_dbus_old_smartcard_oom(dev, apdev): + """The old D-Bus interface - smartcard (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + + for arg in [ 'opensc_engine_path', 'pkcs11_engine_path', 'pkcs11_module_path' ]: + with alloc_fail_dbus(dev[0], 1, + "=wpas_dbus_iface_set_smartcard_modules", + "setSmartcardModules", + "InvalidOptions"): + params = dbus.Dictionary({ arg : "foo", }, signature='sv') + if_obj.setSmartcardModules(params, + dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_smartcard_modules", + "setSmartcardModules", "InvalidOptions"): + params = dbus.Dictionary({ arg : "foo", }, signature='sv') + if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE) + def test_dbus_old_interface(dev, apdev): """The old D-Bus interface - interface get/add/remove""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -295,6 +313,22 @@ def test_dbus_old_interface(dev, apdev): if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid removeInterface: " + str(e)) +def test_dbus_old_interface_oom(dev, apdev): + """The old D-Bus interface - interface get/add/remove (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE) + + with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_global_add_interface", + "addInterface", "InvalidOptions"): + params = dbus.Dictionary({ 'driver': 'none' }, signature='sv') + wpas.addInterface("lo", params) + + for arg in [ "driver", "driver-params", "config-file", "bridge-ifname" ]: + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_global_add_interface", + "addInterface", "InvalidOptions"): + params = dbus.Dictionary({ arg: 'foo' }, signature='sv') + wpas.addInterface("lo", params) + def test_dbus_old_blob(dev, apdev): """The old D-Bus interface - blob operations""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -335,6 +369,31 @@ def test_dbus_old_blob(dev, apdev): if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE) +def test_dbus_old_blob_oom(dev, apdev): + """The old D-Bus interface - blob operations (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + + blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]), + 'blob2': dbus.ByteArray([ 1, 2 ]) }, + signature='sv') + + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_blobs", "setBlobs", + "AddError: Not enough memory to add blob"): + if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 2, "=wpas_dbus_iface_set_blobs", "setBlobs", + "AddError: Not enough memory to add blob data"): + if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 3, "=wpas_dbus_iface_set_blobs", "setBlobs", + "AddError: Error adding blob"): + if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_decompose_object_path;wpas_iface_message_handler", + "setBlobs", + "InvalidInterface: wpa_supplicant knows nothing about this interface"): + if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) + def test_dbus_old_connect(dev, apdev): """The old D-Bus interface - add a network and connect""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) @@ -725,3 +784,56 @@ def _test_dbus_old_wps_reg(dev, apdev): raise Exception("Expected signals not seen") dev[0].wait_connected(timeout=10) + +def test_dbus_old_wps_oom(dev, apdev): + """The old D-Bus interface and WPS (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + bssid = apdev[0]['bssid'] + + with alloc_fail_dbus(dev[0], 1, + "=wpa_config_add_network;wpas_dbus_iface_wps_pbc", + "wpsPbc", + "WpsPbcError: Could not start PBC negotiation"): + if_obj.wpsPbc("any", dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 1, + "=wpa_config_add_network;wpas_dbus_iface_wps_pin", + "wpsPin", "WpsPinError: Could not init PIN"): + if_obj.wpsPin("any", "", dbus_interface=WPAS_DBUS_OLD_IFACE) + + with alloc_fail_dbus(dev[0], 1, + "=wpa_config_add_network;wpas_dbus_iface_wps_reg", + "wpsReg", + "WpsRegError: Could not request credentials"): + if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE) + +def test_dbus_old_network_set_oom(dev, apdev): + """The old D-Bus interface and network set method (OOM)""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + + with alloc_fail_dbus(dev[0], 1, + "=wpa_config_add_network;wpas_dbus_iface_add_network", + "addNetwork", + "AddNetworkError: wpa_supplicant could not add"): + if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) + + path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) + netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path) + netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK) + + with alloc_fail_dbus(dev[0], 1, + "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_network", + "set", "InvalidOptions"): + params = dbus.Dictionary({ 'ssid': "foo" }, signature='sv') + netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK) + + tests = [ { 'identity': dbus.ByteArray([ 1, 2 ]) }, + { 'scan_freq': dbus.UInt32(2412) }, + { 'priority': dbus.Int32(0) }, + { 'identity': "user" }, + { 'eap': "TLS" }] + for arg in tests: + with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_network", + "set", "InvalidOptions"): + params = dbus.Dictionary(arg, signature='sv') + netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)