Merge branch 'rf_experiment' into rf_experiment_v0.9

# Conflicts:
#	README.md
#	broadlink/__init__.py
#	cli/broadlink_cli
This commit is contained in:
Sergey Prilukin 2018-11-26 00:21:37 +02:00
commit 2b4e6d91ff
No known key found for this signature in database
GPG key ID: 4918961AE5C05730
3 changed files with 135 additions and 36 deletions

View file

@ -43,6 +43,27 @@ Enter learning mode:
devices[0].enter_learning() devices[0].enter_learning()
``` ```
Sweep RF frequencies:
```
devices[0].sweep_frequency()
```
Cancel sweep RF frequencies:
```
devices[0].cancel_sweep_frequency()
```
Check whether a frequency has been found:
```
found = devices[0].check_frequency()
```
(This will return True if the RM has locked onto a frequency, False otherwise)
Attempt to learn an RF packet:
```
found = devices[0].find_rf_packet()
```
(This will return True if a packet has been found, False otherwise)
Obtain an IR or RF packet while in learning mode: Obtain an IR or RF packet while in learning mode:
``` ```
ir_packet = devices[0].check_data() ir_packet = devices[0].check_data()

View file

@ -545,6 +545,38 @@ class rm(device):
packet[0] = 3 packet[0] = 3
self.send_packet(0x6a, packet) self.send_packet(0x6a, packet)
def sweep_frequency(self):
packet = bytearray(16)
packet[0] = 0x19
self.send_packet(0x6a, packet)
def cancel_sweep_frequency(self):
packet = bytearray(16)
packet[0] = 0x1e
self.send_packet(0x6a, packet)
def check_frequency(self):
packet = bytearray(16)
packet[0] = 0x1a
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err == 0:
payload = self.decrypt(bytes(response[0x38:]))
if payload[0x04] == 1:
return True
return False
def find_rf_packet(self):
packet = bytearray(16)
packet[0] = 0x1b
response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8)
if err == 0:
payload = self.decrypt(bytes(response[0x38:]))
if payload[0x04] == 1:
return True
return False
def check_temperature(self): def check_temperature(self):
packet = bytearray(16) packet = bytearray(16)
packet[0] = 1 packet[0] = 1
@ -576,19 +608,19 @@ class hysen(device):
self.type = "Hysen heating controller" self.type = "Hysen heating controller"
# Send a request # Send a request
# input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00])
# Returns decrypted payload # Returns decrypted payload
# New behaviour: raises a ValueError if the device response indicates an error or CRC check fails # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails
# The function prepends length (2 bytes) and appends CRC # The function prepends length (2 bytes) and appends CRC
def send_request(self,input_payload): def send_request(self,input_payload):
from PyCRC.CRC16 import CRC16 from PyCRC.CRC16 import CRC16
crc = CRC16(modbus_flag=True).calculate(bytes(input_payload)) crc = CRC16(modbus_flag=True).calculate(bytes(input_payload))
# first byte is length, +2 for CRC16 # first byte is length, +2 for CRC16
request_payload = bytearray([len(input_payload) + 2,0x00]) request_payload = bytearray([len(input_payload) + 2,0x00])
request_payload.extend(input_payload) request_payload.extend(input_payload)
# append CRC # append CRC
request_payload.append(crc & 0xFF) request_payload.append(crc & 0xFF)
request_payload.append((crc >> 8) & 0xFF) request_payload.append((crc >> 8) & 0xFF)
@ -598,9 +630,9 @@ class hysen(device):
# check for error # check for error
err = response[0x22] | (response[0x23] << 8) err = response[0x22] | (response[0x23] << 8)
if err: if err:
raise ValueError('broadlink_response_error',err) raise ValueError('broadlink_response_error',err)
response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) response_payload = bytearray(self.decrypt(bytes(response[0x38:])))
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
@ -610,9 +642,9 @@ class hysen(device):
crc = CRC16(modbus_flag=True).calculate(bytes(response_payload[2:response_payload_len])) crc = CRC16(modbus_flag=True).calculate(bytes(response_payload[2:response_payload_len]))
if (response_payload[response_payload_len] == crc & 0xFF) and (response_payload[response_payload_len+1] == (crc >> 8) & 0xFF): if (response_payload[response_payload_len] == crc & 0xFF) and (response_payload[response_payload_len+1] == (crc >> 8) & 0xFF):
return response_payload[2:response_payload_len] return response_payload[2:response_payload_len]
else: else:
raise ValueError('hysen_response_error','CRC check on response failed') raise ValueError('hysen_response_error','CRC check on response failed')
# Get current room temperature in degrees celsius # Get current room temperature in degrees celsius
def get_temp(self): def get_temp(self):
@ -626,7 +658,7 @@ class hysen(device):
# Get full status (including timer schedule) # Get full status (including timer schedule)
def get_full_status(self): def get_full_status(self):
payload = self.send_request(bytearray([0x01,0x03,0x00,0x00,0x00,0x16])) payload = self.send_request(bytearray([0x01,0x03,0x00,0x00,0x00,0x16]))
data = {} data = {}
data['remote_lock'] = payload[3] & 1 data['remote_lock'] = payload[3] & 1
data['power'] = payload[4] & 1 data['power'] = payload[4] & 1
@ -652,11 +684,11 @@ class hysen(device):
data['min'] = payload[20] data['min'] = payload[20]
data['sec'] = payload[21] data['sec'] = payload[21]
data['dayofweek'] = payload[22] data['dayofweek'] = payload[22]
weekday = [] weekday = []
for i in range(0, 6): for i in range(0, 6):
weekday.append({'start_hour':payload[2*i + 23], 'start_minute':payload[2*i + 24],'temp':payload[i + 39]/2.0}) weekday.append({'start_hour':payload[2*i + 23], 'start_minute':payload[2*i + 24],'temp':payload[i + 39]/2.0})
data['weekday'] = weekday data['weekday'] = weekday
weekend = [] weekend = []
for i in range(6, 8): for i in range(6, 8):
@ -693,7 +725,7 @@ class hysen(device):
# For backwards compatibility only. Prefer calling set_mode directly. Note this function invokes loop_mode=0 and sensor=0. # For backwards compatibility only. Prefer calling set_mode directly. Note this function invokes loop_mode=0 and sensor=0.
def switch_to_auto(self): def switch_to_auto(self):
self.set_mode(auto_mode=1, loop_mode=0) self.set_mode(auto_mode=1, loop_mode=0)
def switch_to_manual(self): def switch_to_manual(self):
self.set_mode(auto_mode=0, loop_mode=0) self.set_mode(auto_mode=0, loop_mode=0)

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python3
import broadlink import broadlink
import sys import sys
@ -79,7 +79,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on
parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--send", action="store_true", help="send command")
parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--sensors", action="store_true", help="check all sensors")
parser.add_argument("--learn", action="store_true", help="learn command") parser.add_argument("--learn", action="store_true", help="learn command")
parser.add_argument("--learnfile", help="learn command and save to specified file") parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning")
parser.add_argument("--learnfile", help="save learned command to a specified file")
parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format")
parser.add_argument("--convert", action="store_true", help="convert input data to durations") parser.add_argument("--convert", action="store_true", help="convert input data to durations")
parser.add_argument("data", nargs='*', help="Data to send or convert") parser.add_argument("data", nargs='*', help="Data to send or convert")
@ -102,11 +103,11 @@ if args.host or args.device:
if args.convert: if args.convert:
data = bytearray.fromhex(''.join(args.data)) data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data) durations = to_microseconds(data)
print format_durations(durations) print(format_durations(durations))
if args.temperature: if args.temperature:
print dev.check_temperature() print(dev.check_temperature())
if args.energy: if args.energy:
print dev.get_energy() print(dev.get_energy())
if args.sensors: if args.sensors:
try: try:
data = dev.check_sensors() data = dev.check_sensors()
@ -114,15 +115,15 @@ if args.sensors:
data = {} data = {}
data['temperature'] = dev.check_temperature() data['temperature'] = dev.check_temperature()
for key in data: for key in data:
print "{} {}".format(key, data[key]) print("{} {}".format(key, data[key]))
if args.send: if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
if args.durations else bytearray.fromhex(''.join(args.data)) if args.durations else bytearray.fromhex(''.join(args.data))
dev.send_data(data) dev.send_data(data)
if args.learn or args.learnfile: if args.learn:
dev.enter_learning() dev.enter_learning()
data = None data = None
print "Learning..." print("Learning...")
timeout = 30 timeout = 30
while (data is None) and (timeout > 0): while (data is None) and (timeout > 0):
time.sleep(2) time.sleep(2)
@ -133,51 +134,96 @@ if args.learn or args.learnfile:
if args.durations \ if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data)) else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn: if args.learn:
print learned print(learned)
if args.learnfile: if args.learnfile:
print "Saving to {}".format(args.learnfile) print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file: with open(args.learnfile, "w") as text_file:
text_file.write(learned) text_file.write(learned)
else: else:
print "No data received..." print("No data received...")
if args.check: if args.check:
if dev.check_power(): if dev.check_power():
print '* ON *' print('* ON *')
else: else:
print '* OFF *' print('* OFF *')
if args.checknl: if args.checknl:
if dev.check_nightlight(): if dev.check_nightlight():
print '* ON *' print('* ON *')
else: else:
print '* OFF *' print('* OFF *')
if args.turnon: if args.turnon:
dev.set_power(True) dev.set_power(True)
if dev.check_power(): if dev.check_power():
print '== Turned * ON * ==' print('== Turned * ON * ==')
else: else:
print '!! Still OFF !!' print('!! Still OFF !!')
if args.turnoff: if args.turnoff:
dev.set_power(False) dev.set_power(False)
if dev.check_power(): if dev.check_power():
print '!! Still ON !!' print('!! Still ON !!')
else: else:
print '== Turned * OFF * ==' print('== Turned * OFF * ==')
if args.turnnlon: if args.turnnlon:
dev.set_nightlight(True) dev.set_nightlight(True)
if dev.check_nightlight(): if dev.check_nightlight():
print '== Turned * ON * ==' print('== Turned * ON * ==')
else: else:
print '!! Still OFF !!' print('!! Still OFF !!')
if args.turnnloff: if args.turnnloff:
dev.set_nightlight(False) dev.set_nightlight(False)
if dev.check_nightlight(): if dev.check_nightlight():
print '!! Still ON !!' print('!! Still ON !!')
else: else:
print '== Turned * OFF * ==' print('== Turned * OFF * ==')
if args.switch: if args.switch:
if dev.check_power(): if dev.check_power():
dev.set_power(False) dev.set_power(False)
print '* Switch to OFF *' print('* Switch to OFF *')
else: else:
dev.set_power(True) dev.set_power(True)
print '* Switch to ON *' print('* Switch to ON *')
if args.rfscanlearn:
dev.sweep_frequency()
print("Learning RF Frequency, press and hold the button to learn...")
timeout = 20
while (not dev.check_frequency()) and (timeout > 0):
time.sleep(1)
timeout -= 1
if timeout <= 0:
print("RF Frequency not found")
dev.cancel_sweep_frequency()
exit(1)
print("Found RF Frequency - 1 of 2!")
print("You can now let go of the button")
input("Press enter to continue...")
print("To complete learning, single press the button you want to learn")
dev.find_rf_packet()
data = None
timeout = 20
while (data is None) and (timeout > 0):
time.sleep(1)
timeout -= 1
data = dev.check_data()
if data:
print("Found RF Frequency - 2 of 2!")
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learnfile is None:
print(learned)
if args.learnfile is not None:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
else:
print("No data received...")