tests: D-Bus interface for NAN USD

Signed-off-by: Jouni Malinen <quic_jouni@quicinc.com>
This commit is contained in:
Jouni Malinen 2024-09-14 18:24:24 +03:00 committed by Jouni Malinen
parent 85cd98976d
commit 5ace39b0a4

View file

@ -13,6 +13,7 @@ import shutil
import struct
import sys
from test_ap_hs20 import hs20_ap_params
from test_nan_usd import check_nan_usd_capab, split_nan_event
try:
if sys.version_info[0] > 2:
@ -6424,3 +6425,240 @@ def test_dbus_bss_anqp_properties(dev, apdev):
with TestDbusANQPBSSPropertiesChanged(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
def test_dbus_nan_usd_publish(dev, apdev):
"""D-Bus NAN USD publish"""
check_nan_usd_capab(dev[0])
(bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
class TestDbusNANUSD(TestDbus):
def __init__(self, bus):
TestDbus.__init__(self, bus)
self.publish_terminated = False
def __enter__(self):
gobject.timeout_add(1, self.start_publish)
gobject.timeout_add(500, self.stop_publish)
gobject.timeout_add(15000, self.timeout)
self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
"NANPublishTerminated")
self.loop.run()
return self
def nanPublishTerminated(self, publish_id, reason):
logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
if publish_id == self.publish_id:
self.publish_terminated = True
def start_publish(self, *args):
self.publish_id = iface.NANPublish({'srv_name': 'test service',
'srv_proto_type': 2,
'ssi': dbus.ByteArray(b'test')})
iface.NANUpdatePublish({'publish_id': self.publish_id,
'ssi': dbus.ByteArray(b'new')})
return False
def stop_publish(self, *args):
iface.NANCancelPublish(self.publish_id)
return False
def success(self):
return self.publish_terminated
with TestDbusNANUSD(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
def test_dbus_nan_usd_subscribe(dev, apdev):
"""D-Bus NAN USD subscribe"""
check_nan_usd_capab(dev[0])
(bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
class TestDbusNANUSD(TestDbus):
def __init__(self, bus):
TestDbus.__init__(self, bus)
self.subscribe_terminated = False
def __enter__(self):
gobject.timeout_add(1, self.start_subscribe)
gobject.timeout_add(500, self.stop_subscribe)
gobject.timeout_add(15000, self.timeout)
self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
"NANSubscribeTerminated")
self.loop.run()
return self
def nanSubscribeTerminated(self, subscribe_id, reason):
logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
if subscribe_id == self.subscribe_id:
self.subscribe_terminated = True
self.loop.quit()
def start_subscribe(self, *args):
self.subscribe_id = iface.NANSubscribe({'srv_name': 'test service',
'srv_proto_type': 2,
'ssi': dbus.ByteArray(b'test')})
return False
def stop_subscribe(self, *args):
iface.NANCancelSubscribe(self.subscribe_id)
return False
def success(self):
return self.subscribe_terminated
with TestDbusNANUSD(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
def test_dbus_nan_usd_subscribe_followup(dev, apdev):
"""D-Bus NAN USD subscribe and followup"""
check_nan_usd_capab(dev[0])
check_nan_usd_capab(dev[1])
(bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
class TestDbusNANUSD(TestDbus):
def __init__(self, bus):
TestDbus.__init__(self, bus)
self.subscribe_terminated = False
self.discovered = False
self.followup = False
def __enter__(self):
gobject.timeout_add(1, self.start_subscribe)
gobject.timeout_add(15000, self.timeout)
self.add_signal(self.nanDiscoveryResult, WPAS_DBUS_IFACE,
"NANDiscoveryResult")
self.add_signal(self.nanSubscribeTerminated, WPAS_DBUS_IFACE,
"NANSubscribeTerminated")
self.loop.run()
return self
def nanDiscoveryResult(self, args):
logger.debug("nanDiscoveryResult: %s" % str(args))
ssi = args['ssi']
publish_id = args['publish_id']
subscribe_id = args['subscribe_id']
peer_addr = args['peer_addr']
if publish_id == self.id1 and \
subscribe_id == self.subscribe_id and \
args['srv_proto_type'] == 3 and \
peer_addr == dev[1].own_addr() and \
len(ssi) == 2 and ssi[0] == 0x66 and ssi[1] == 0x77:
self.discovered = True
ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
if ev is None:
raise Exception("Automatically sent Follow-up message without ssi not seen")
iface.NANTransmit({'handle': subscribe_id,
'req_instance_id': publish_id,
'peer_addr': peer_addr,
'ssi': dbus.ByteArray(b'followup')})
ev = dev[1].wait_event(["NAN-RECEIVE"], timeout=5)
if ev is None:
raise Exception("Follow-up message not seen")
if "ssi=666f6c6c6f777570" not in ev.split(' '):
raise Exception("Expected SSI not seen in Follow-up")
self.followup = True
else:
logger.info("nanDiscoveryResult values did not match")
def nanSubscribeTerminated(self, subscribe_id, reason):
logger.debug("nanSubscribeTerminated: %d %s" % (subscribe_id, reason))
if subscribe_id == self.subscribe_id:
self.subscribe_terminated = True
self.loop.quit()
def start_subscribe(self, *args):
self.subscribe_id = iface.NANSubscribe({'srv_name': '_test',
'srv_proto_type': 3,
'ssi': dbus.ByteArray(b'test')})
cmd = "NAN_PUBLISH service_name=_test srv_proto_type=3 ssi=6677 ttl=10"
self.id1 = dev[1].request(cmd)
if "FAIL" in self.id1:
raise Exception("NAN_PUBLISH failed")
self.id1 = int(self.id1)
return False
def success(self):
return self.subscribe_terminated and self.discovered and \
self.followup
with TestDbusNANUSD(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
def test_dbus_nan_usd_publish_followup(dev, apdev):
"""D-Bus NAN USD publish and followup"""
check_nan_usd_capab(dev[0])
(bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
class TestDbusNANUSD(TestDbus):
def __init__(self, bus):
TestDbus.__init__(self, bus)
self.publish_terminated = False
self.receive = False
self.first_receive = True
self.followup = False
def __enter__(self):
gobject.timeout_add(1, self.start_publish)
gobject.timeout_add(15000, self.timeout)
self.add_signal(self.nanPublishTerminated, WPAS_DBUS_IFACE,
"NANPublishTerminated")
self.add_signal(self.nanReceive, WPAS_DBUS_IFACE,
"NANReceive")
self.loop.run()
return self
def nanPublishTerminated(self, publish_id, reason):
logger.debug("nanPublishTerminated: %d %s" % (publish_id, reason))
if publish_id == self.publish_id:
self.publish_terminated = True
self.loop.quit()
def nanReceive(self, args):
logger.debug("nanReceive: %s" % str(args))
self.receive = True
if self.first_receive:
self.first_receive = False
return
ssi = args['ssi']
if len(ssi) == 2 and ssi[0] == 0x88 and ssi[1] == 0x99:
self.followup = True
iface.NANTransmit({'handle': args['id'],
'req_instance_id': args['peer_id'],
'peer_addr': args['peer_addr'],
'ssi': dbus.ByteArray(b'followup')})
def start_publish(self, *args):
cmd = "NAN_SUBSCRIBE service_name=_test srv_proto_type=3 ssi=1122334455"
id1 = dev[1].request(cmd)
if "FAIL" in id1:
raise Exception("NAN_SUBSCRIBE failed")
self.publish_id = iface.NANPublish({'srv_name': '_test',
'srv_proto_type': 2,
'ssi': dbus.ByteArray(b'test')})
ev = dev[1].wait_event(["NAN-DISCOVERY-RESULT"], timeout=10)
if ev is None:
raise Exception("DiscoveryResult event not seen")
vals = split_nan_event(ev)
cmd = "NAN_TRANSMIT handle={} req_instance_id={} address={} ssi=8899".format(vals['subscribe_id'], vals['publish_id'], vals['address'])
if "FAIL" in dev[1].request(cmd):
raise Exception("NAN_TRANSMIT failed")
return False
def success(self):
return self.publish_terminated and self.receive and self.followup
with TestDbusNANUSD(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")