hostapd/wpa_supplicant/examples/p2p/p2p_connect.py

300 lines
7.5 KiB
Python
Raw Normal View History

#!/usr/bin/python
# Tests p2p_connect
# Will try to connect to another peer
# and form a group
######### MAY NEED TO RUN AS SUDO #############
import dbus
import sys, os
import time
import gobject
import getopt
from dbus.mainloop.glib import DBusGMainLoop
def usage():
print("Usage:")
print(" %s -i <interface_name> -m <wps_method> \ " \
% sys.argv[0])
print(" -a <addr> [-p <pin>] [-g <go_intent>] \ ")
print(" [-w <wpas_dbus_interface>]")
print("Options:")
print(" -i = interface name")
print(" -m = wps method")
print(" -a = peer address")
print(" -p = pin number (8 digits)")
print(" -g = group owner intent")
print(" -w = wpas dbus interface = fi.w1.wpa_supplicant1")
print("Example:")
print(" %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0])
# Required Signals
def GONegotiationSuccess(status):
print("Go Negotiation Success")
def GONegotiationFailure(status):
print('Go Negotiation Failed. Status:')
print(format(status))
os._exit(0)
def GroupStarted(properties):
if properties.has_key("group_object"):
print('Group Formation Complete %s' \
% properties["group_object"])
os._exit(0)
def WpsFailure(status, etc):
print("WPS Authentication Failure".format(status))
print(etc)
os._exit(0)
class P2P_Connect():
# Needed Variables
global bus
global wpas_object
global interface_object
global p2p_interface
global ifname
global wpas
global wpas_dbus_interface
global timeout
global path
global wps_method
global go_intent
global addr
global pin
# Dbus Paths
global wpas_dbus_opath
global wpas_dbus_interfaces_opath
global wpas_dbus_interfaces_interface
global wpas_dbus_interfaces_p2pdevice
# Dictionary of Arguements
global p2p_connect_arguements
# Constructor
def __init__(self,ifname,wpas_dbus_interface,addr,
pin,wps_method,go_intent):
# Initializes variables and threads
self.ifname = ifname
self.wpas_dbus_interface = wpas_dbus_interface
self.wps_method = wps_method
self.go_intent = go_intent
self.addr = addr
self.pin = pin
# Generating interface/object paths
self.wpas_dbus_opath = \
"/" + self.wpas_dbus_interface.replace(".","/")
self.wpas_wpas_dbus_interfaces_opath = \
self.wpas_dbus_opath + "/Interfaces"
self.wpas_dbus_interfaces_interface = \
self.wpas_dbus_interface + ".Interface"
self.wpas_dbus_interfaces_p2pdevice = \
self.wpas_dbus_interfaces_interface + ".P2PDevice"
# Getting interfaces and objects
DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus()
self.wpas_object = self.bus.get_object(
self.wpas_dbus_interface,
self.wpas_dbus_opath)
self.wpas = dbus.Interface(
self.wpas_object, self.wpas_dbus_interface)
# See if wpa_supplicant already knows about this interface
self.path = None
try:
self.path = self.wpas.GetInterface(ifname)
except dbus.DBusException as exc:
if not str(exc).startswith(
self.wpas_dbus_interface + \
".InterfaceUnknown:"):
raise exc
try:
path = self.wpas.CreateInterface(
{'Ifname': ifname, 'Driver': 'test'})
time.sleep(1)
except dbus.DBusException as exc:
if not str(exc).startswith(
self.wpas_dbus_interface + \
".InterfaceExists:"):
raise exc
# Get Interface and objects
self.interface_object = self.bus.get_object(
self.wpas_dbus_interface,self.path)
self.p2p_interface = dbus.Interface(
self.interface_object,
self.wpas_dbus_interfaces_p2pdevice)
# Add signals
self.bus.add_signal_receiver(GONegotiationSuccess,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="GONegotiationSuccess")
self.bus.add_signal_receiver(GONegotiationFailure,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="GONegotiationFailure")
self.bus.add_signal_receiver(GroupStarted,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="GroupStarted")
self.bus.add_signal_receiver(WpsFailure,
dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
signal_name="WpsFailed")
#Constructing all the arguements needed to connect
def constructArguements(self):
# Adding required arguements
self.p2p_connect_arguements = {'wps_method':self.wps_method,
'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
# Display requires a pin, and a go intent of 15
if (self.wps_method == 'display'):
if (self.pin != None):
self.p2p_connect_arguements.update({'pin':self.pin})
else:
print("Error:\n Pin required for wps_method=display")
usage()
quit()
if (self.go_intent != None and int(self.go_intent) != 15):
print("go_intent overwritten to 15")
self.go_intent = '15'
# Keypad requires a pin, and a go intent of less than 15
elif (self.wps_method == 'keypad'):
if (self.pin != None):
self.p2p_connect_arguements.update({'pin':self.pin})
else:
print("Error:\n Pin required for wps_method=keypad")
usage()
quit()
if (self.go_intent != None and int(self.go_intent) == 15):
error = "Error :\n Group Owner intent cannot be" + \
" 15 for wps_method=keypad"
print(error)
usage()
quit()
# Doesn't require pin
# for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
elif (self.wps_method == 'pin'):
if (self.pin != None):
print("pin ignored")
# No pin is required for pbc so it is ignored
elif (self.wps_method == 'pbc'):
if (self.pin != None):
print("pin ignored")
else:
print("Error:\n wps_method not supported or does not exist")
usage()
quit()
# Go_intent is optional for all arguements
if (self.go_intent != None):
self.p2p_connect_arguements.update(
{'go_intent':dbus.Int32(self.go_intent)})
# Running p2p_connect
def run(self):
try:
result_pin = self.p2p_interface.Connect(
self.p2p_connect_arguements)
except dbus.DBusException as exc:
raise exc
if (self.wps_method == 'pin' and \
not self.p2p_connect_arguements.has_key('pin') ):
print("Connect return with pin value of %d " % int(result_pin))
gobject.MainLoop().run()
if __name__ == "__main__":
# Required
interface_name = None
wps_method = None
addr = None
# Conditionally optional
pin = None
# Optional
wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
go_intent = None
# Using getopts to handle options
try:
options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
except getopt.GetoptError:
usage()
quit()
# If theres a switch, override default option
for key, value in options:
# Help
if (key == "-h"):
usage()
quit()
# Interface Name
elif (key == "-i"):
interface_name = value
# WPS Method
elif (key == "-m"):
wps_method = value
# Address
elif (key == "-a"):
addr = value
# Pin
elif (key == "-p"):
pin = value
# Group Owner Intent
elif (key == "-g"):
go_intent = value
# Dbus interface
elif (key == "-w"):
wpas_dbus_interface = value
else:
assert False, "unhandled option"
# Required Arguements check
if (interface_name == None or wps_method == None or addr == None):
print("Error:\n Required arguements not specified")
usage()
quit()
# Group Owner Intent Check
if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
print("Error:\n Group Owner Intent must be between 0 and 15 inclusive")
usage()
quit()
# Pin Check
if (pin != None and len(pin) != 8):
print("Error:\n Pin is not 8 digits")
usage()
quit()
try:
p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
addr,pin,wps_method,go_intent)
except:
print("Error:\n Invalid Arguements")
usage()
quit()
p2p_connect_test.constructArguements()
p2p_connect_test.run()
os._exit(0)