Add support for sensor data
Add a check_temperature() function that'll work for both the RM2 and the A1 sensor platform, and a check_sensors() function that returns the full set of sensor data for the A1 as a dict.
This commit is contained in:
parent
ef77bc7ea8
commit
69afd4ce52
1 changed files with 56 additions and 1 deletions
|
@ -168,10 +168,65 @@ class rm2:
|
||||||
packet[0] = 3
|
packet[0] = 3
|
||||||
self.send_packet(0x6a, packet)
|
self.send_packet(0x6a, packet)
|
||||||
|
|
||||||
|
def check_sensors(self):
|
||||||
|
packet = bytearray(16)
|
||||||
|
packet[0] = 1
|
||||||
|
response = self.send_packet(0x6a, packet)
|
||||||
|
err = ord(response[0x22]) | (ord(response[0x23]) << 8)
|
||||||
|
if err == 0:
|
||||||
|
data = {}
|
||||||
|
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
|
||||||
|
payload = aes.decrypt(str(response[0x38:]))
|
||||||
|
data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0
|
||||||
|
data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0
|
||||||
|
light = ord(payload[0x8])
|
||||||
|
if light == 0:
|
||||||
|
data['light'] = 'dark'
|
||||||
|
elif light == 1:
|
||||||
|
data['light'] = 'dim'
|
||||||
|
elif light == 2:
|
||||||
|
data['light'] = 'normal'
|
||||||
|
elif light == 3:
|
||||||
|
data['light'] = 'bright'
|
||||||
|
else:
|
||||||
|
data['light'] = 'unknown'
|
||||||
|
air_quality = ord(payload[0x0a])
|
||||||
|
if air_quality == 0:
|
||||||
|
data['air_quality'] = 'excellent'
|
||||||
|
elif air_quality == 1:
|
||||||
|
data['air_quality'] = 'good'
|
||||||
|
elif air_quality == 2:
|
||||||
|
data['air_quality'] = 'normal'
|
||||||
|
elif air_quality == 3:
|
||||||
|
data['air_quality'] = 'bad'
|
||||||
|
else:
|
||||||
|
data['air_quality'] = 'unknown'
|
||||||
|
noise = ord(payload[0xc])
|
||||||
|
if noise == 0:
|
||||||
|
data['noise'] = 'quiet'
|
||||||
|
elif noise == 1:
|
||||||
|
data['noise'] = 'normal'
|
||||||
|
elif noise == 2:
|
||||||
|
data['noise'] = 'noisy'
|
||||||
|
else:
|
||||||
|
data['noise'] = 'unknown'
|
||||||
|
return data
|
||||||
|
|
||||||
|
def check_temperature(self):
|
||||||
|
packet = bytearray(16)
|
||||||
|
packet[0] = 1
|
||||||
|
response = self.send_packet(0x6a, packet)
|
||||||
|
err = ord(response[0x22]) | (ord(response[0x23]) << 8)
|
||||||
|
if err == 0:
|
||||||
|
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
|
||||||
|
payload = aes.decrypt(str(response[0x38:]))
|
||||||
|
temp = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0
|
||||||
|
return temp
|
||||||
|
|
||||||
def check_data(self):
|
def check_data(self):
|
||||||
packet = bytearray(16)
|
packet = bytearray(16)
|
||||||
packet[0] = 4
|
packet[0] = 4
|
||||||
response = self.send_packet(0x6a, packet)
|
response = self.send_packet(0x6a, packet)
|
||||||
err = ord(response[0x22]) | (ord(response[0x23]) << 8)
|
err = ord(response[0x22]) | (ord(response[0x23]) << 8)
|
||||||
if err == 0:
|
if err == 0:
|
||||||
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
|
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
|
||||||
|
|
Loading…
Reference in a new issue