diff --git a/tests/hwsim/test_dbus.py b/tests/hwsim/test_dbus.py index 5affbc8f6..daba14e64 100644 --- a/tests/hwsim/test_dbus.py +++ b/tests/hwsim/test_dbus.py @@ -4086,6 +4086,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev): def __init__(self, bus): TestDbus.__init__(self, bus) self.done = False + self.peer_group_added = False + self.peer_group_removed = False def __enter__(self): gobject.timeout_add(1, self.run_test) @@ -4100,6 +4102,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev): "GroupStarted") self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, "GroupFinished") + self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, + "PropertiesChanged") self.loop.run() return self @@ -4135,7 +4139,19 @@ def test_dbus_p2p_go_neg_init(dev, apdev): def groupFinished(self, properties): logger.debug("groupFinished: " + str(properties)) self.done = True - self.loop.quit() + + def propertiesChanged(self, interface_name, changed_properties, + invalidated_properties): + logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) + if interface_name != WPAS_DBUS_P2P_PEER: + return + if "Groups" not in changed_properties: + return + if len(changed_properties["Groups"]) > 0: + self.peer_group_added = True + if len(changed_properties["Groups"]) == 0: + self.peer_group_removed = True + self.loop.quit() def run_test(self, *args): logger.debug("run_test") @@ -4143,7 +4159,195 @@ def test_dbus_p2p_go_neg_init(dev, apdev): return False def success(self): - return self.done + return self.done and self.peer_group_added and self.peer_group_removed + + with TestDbusP2p(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") + +def test_dbus_p2p_group_termination_by_go(dev, apdev): + """D-Bus P2P group removal on GO terminating the group""" + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) + addr0 = dev[0].p2p_dev_addr() + dev[1].p2p_listen() + + class TestDbusP2p(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.done = False + self.peer_group_added = False + self.peer_group_removed = False + + def __enter__(self): + gobject.timeout_add(1, self.run_test) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, + "DeviceFound") + self.add_signal(self.goNegotiationSuccess, + WPAS_DBUS_IFACE_P2PDEVICE, + "GONegotiationSuccess", + byte_arrays=True) + self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, + "GroupStarted") + self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, + "GroupFinished") + self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, + "PropertiesChanged") + self.loop.run() + return self + + def deviceFound(self, path): + logger.debug("deviceFound: path=%s" % path) + dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') + args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670', + 'go_intent': 0 } + p2p.Connect(args) + + ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) + if ev is None: + raise Exception("Timeout while waiting for GO Neg Request") + dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") + ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15); + if ev is None: + raise Exception("Group formation timed out") + self.sta_group_ev = ev + + def goNegotiationSuccess(self, properties): + logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) + + def groupStarted(self, properties): + logger.debug("groupStarted: " + str(properties)) + g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, + properties['interface_object']) + dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') + dev1.group_form_result(self.sta_group_ev) + dev1.remove_group() + + def groupFinished(self, properties): + logger.debug("groupFinished: " + str(properties)) + self.done = True + + def propertiesChanged(self, interface_name, changed_properties, + invalidated_properties): + logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) + if interface_name != WPAS_DBUS_P2P_PEER: + return + if "Groups" not in changed_properties: + return + if len(changed_properties["Groups"]) > 0: + self.peer_group_added = True + if len(changed_properties["Groups"]) == 0: + self.peer_group_removed = True + self.loop.quit() + + def run_test(self, *args): + logger.debug("run_test") + p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) + return False + + def success(self): + return self.done and self.peer_group_added and self.peer_group_removed + + with TestDbusP2p(bus) as t: + if not t.success(): + raise Exception("Expected signals not seen") + +def test_dbus_p2p_group_idle_timeout(dev, apdev): + """D-Bus P2P group removal on idle timeout""" + try: + dev[0].global_request("SET p2p_group_idle 1") + _test_dbus_p2p_group_idle_timeout(dev, apdev) + finally: + dev[0].global_request("SET p2p_group_idle 0") + +def _test_dbus_p2p_group_idle_timeout(dev, apdev): + (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) + p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) + addr0 = dev[0].p2p_dev_addr() + dev[1].p2p_listen() + + class TestDbusP2p(TestDbus): + def __init__(self, bus): + TestDbus.__init__(self, bus) + self.done = False + self.peer_group_added = False + self.peer_group_removed = False + + def __enter__(self): + gobject.timeout_add(1, self.run_test) + gobject.timeout_add(15000, self.timeout) + self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, + "DeviceFound") + self.add_signal(self.goNegotiationSuccess, + WPAS_DBUS_IFACE_P2PDEVICE, + "GONegotiationSuccess", + byte_arrays=True) + self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, + "GroupStarted") + self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, + "GroupFinished") + self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, + "PropertiesChanged") + self.loop.run() + return self + + def deviceFound(self, path): + logger.debug("deviceFound: path=%s" % path) + dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') + args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670', + 'go_intent': 0 } + p2p.Connect(args) + + ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) + if ev is None: + raise Exception("Timeout while waiting for GO Neg Request") + dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") + ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15); + if ev is None: + raise Exception("Group formation timed out") + self.sta_group_ev = ev + + def goNegotiationSuccess(self, properties): + logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) + + def groupStarted(self, properties): + logger.debug("groupStarted: " + str(properties)) + g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, + properties['interface_object']) + dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') + dev1.group_form_result(self.sta_group_ev) + ifaddr = dev1.group_request("STA-FIRST").splitlines()[0] + # Force disassociation with different reason code so that the + # P2P Client using D-Bus does not get normal group termination event + # from the GO. + dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0") + dev1.remove_group() + + def groupFinished(self, properties): + logger.debug("groupFinished: " + str(properties)) + self.done = True + + def propertiesChanged(self, interface_name, changed_properties, + invalidated_properties): + logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) + if interface_name != WPAS_DBUS_P2P_PEER: + return + if "Groups" not in changed_properties: + return + if len(changed_properties["Groups"]) > 0: + self.peer_group_added = True + if len(changed_properties["Groups"]) == 0: + self.peer_group_removed = True + self.loop.quit() + + def run_test(self, *args): + logger.debug("run_test") + p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) + return False + + def success(self): + return self.done and self.peer_group_added and self.peer_group_removed with TestDbusP2p(bus) as t: if not t.success():