Add support for 0x5f36 devices and RM4 series (#317)

* Add support for 0x5f36 devices

This type of device requires a header in the payload. The rest is the same.

* Improve request header assignment

* Change code sending header

I just found out that this device uses a different header for sending codes. This update addresses this issue.

* Improve authentication

Use the error code to check if the authentication was successful.

* Use default value when devtype is None

* Use generic remote type if devtype is None

* Extend support to RM4 series

I just realized that RM4 devices use the same header. I will take the opportunity to extend support to these devices as well.

* Add device type 0x62be and create rm4 class 

The rm4 class will improve code scalability. Just add the RM4 type to this class and it will just work.

* Remove comma
This commit is contained in:
Felipe Martins Diel 2020-03-16 05:49:41 -03:00 committed by GitHub
parent af95fa2446
commit 1a1169f1a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -44,9 +44,14 @@ def gendevice(devtype, host, mac):
0x27a6, # RM2 Pro PP 0x27a6, # RM2 Pro PP
0x278f, # RM Mini Shate 0x278f, # RM Mini Shate
0x27c2, # RM Mini 3 0x27c2, # RM Mini 3
0x27d1, #new RM Mini3 0x27d1, # new RM Mini3
0x27de, # RM Mini 3 (C) 0x27de # RM Mini 3 (C)
], ],
rm4: [0x51da, # RM4b
0x5f36, # RM Mini 3
0x610f, # RM4c
0x62be # RM4c
],
a1: [0x2714], # A1 a1: [0x2714], # A1
mp1: [0x4EB5, # MP1 mp1: [0x4EB5, # MP1
0x4EF7 # Honyar oem mp1 0x4EF7 # Honyar oem mp1
@ -146,7 +151,7 @@ class device:
def __init__(self, host, mac, devtype, timeout=10): def __init__(self, host, mac, devtype, timeout=10):
self.host = host self.host = host
self.mac = mac.encode() if isinstance(mac, str) else mac self.mac = mac.encode() if isinstance(mac, str) else mac
self.devtype = devtype self.devtype = devtype if devtype is not None else 0x272a
self.timeout = timeout self.timeout = timeout
self.count = random.randrange(0xffff) self.count = random.randrange(0xffff)
self.iv = bytearray( self.iv = bytearray(
@ -204,11 +209,11 @@ class device:
payload[0x36] = ord('1') payload[0x36] = ord('1')
response = self.send_packet(0x65, payload) response = self.send_packet(0x65, payload)
payload = self.decrypt(response[0x38:]) if any(response[0x22:0x24]):
if not payload:
return False return False
payload = self.decrypt(response[0x38:])
key = payload[0x04:0x14] key = payload[0x04:0x14]
if len(key) % 16 != 0: if len(key) % 16 != 0:
@ -233,8 +238,8 @@ class device:
packet[0x05] = 0xa5 packet[0x05] = 0xa5
packet[0x06] = 0xaa packet[0x06] = 0xaa
packet[0x07] = 0x55 packet[0x07] = 0x55
packet[0x24] = 0x2a packet[0x24] = self.devtype & 0xff
packet[0x25] = 0x27 packet[0x25] = self.devtype >> 8
packet[0x26] = command packet[0x26] = command
packet[0x28] = self.count & 0xff packet[0x28] = self.count & 0xff
packet[0x29] = self.count >> 8 packet[0x29] = self.count >> 8
@ -251,8 +256,8 @@ class device:
# pad the payload for AES encryption # pad the payload for AES encryption
if payload: if payload:
payload += bytearray(((len(payload)-1)//16+1)*16 - len(payload)) payload += bytearray(16 - len(payload)%16)
checksum = adler32(payload, 0xbeaf) & 0xffff checksum = adler32(payload, 0xbeaf) & 0xffff
packet[0x34] = checksum & 0xff packet[0x34] = checksum & 0xff
packet[0x35] = checksum >> 8 packet[0x35] = checksum >> 8
@ -571,76 +576,88 @@ class rm(device):
def __init__(self, host, mac, devtype): def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype) device.__init__(self, host, mac, devtype)
self.type = "RM2" self.type = "RM2"
self._request_header = bytes()
self._code_sending_header = bytes()
def check_data(self): def check_data(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 4 packet.append(0x04)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) err = response[0x22] | (response[0x23] << 8)
if err != 0: if err != 0:
return None return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
return payload[0x04:] return payload[len(self._request_header) + 4:]
def send_data(self, data): def send_data(self, data):
packet = bytearray([0x02, 0x00, 0x00, 0x00]) packet = bytearray(self._code_sending_header)
packet += bytes([0x02, 0x00, 0x00, 0x00])
packet += data packet += data
self.send_packet(0x6a, packet) self.send_packet(0x6a, packet)
def enter_learning(self): def enter_learning(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 3 packet.append(0x03)
self.send_packet(0x6a, packet) self.send_packet(0x6a, packet)
def sweep_frequency(self): def sweep_frequency(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 0x19 packet.append(0x19)
self.send_packet(0x6a, packet) self.send_packet(0x6a, packet)
def cancel_sweep_frequency(self): def cancel_sweep_frequency(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 0x1e packet.append(0x1e)
self.send_packet(0x6a, packet) self.send_packet(0x6a, packet)
def check_frequency(self): def check_frequency(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 0x1a packet.append(0x1a)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) err = response[0x22] | (response[0x23] << 8)
if err != 0: if err != 0:
return False return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if payload[0x04] == 1: if payload[len(self._request_header) + 4] == 1:
return True return True
return False return False
def find_rf_packet(self): def find_rf_packet(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 0x1b packet.append(0x1b)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) err = response[0x22] | (response[0x23] << 8)
if err != 0: if err != 0:
return False return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if payload[0x04] == 1: if payload[len(self._request_header) + 4] == 1:
return True return True
return False return False
def check_temperature(self): def check_temperature(self):
packet = bytearray(16) packet = bytearray(self._request_header)
packet[0] = 1 packet.append(0x01)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) err = response[0x22] | (response[0x23] << 8)
if err != 0: if err != 0:
return False return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): temp_pos = len(self._request_header) + 4
temp = (payload[0x4] * 10 + payload[0x5]) / 10.0 if isinstance(payload[temp_pos], int):
temp = (payload[temp_pos] * 10 + payload[temp_pos+1]) / 10.0
else: else:
temp = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 temp = (ord(payload[temp_pos]) * 10 + ord(payload[temp_pos+1])) / 10.0
return temp return temp
class rm4(rm):
def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype)
self.type = "RM4"
self._request_header = b'\x04\x00'
self._code_sending_header = b'\xd0\x00'
# For legacy compatibility - don't use this # For legacy compatibility - don't use this
class rm2(rm): class rm2(rm):
def __init__(self): def __init__(self):