Merge pull request #290 from barnybug/add_bgelectrical_sockets

Add support for BG Electrical Smart Sockets
This commit is contained in:
Matthew Garrett 2020-01-29 16:18:00 -08:00 committed by GitHub
commit dfd478922b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,8 +1,10 @@
#!/usr/bin/python #!/usr/bin/python
import codecs import codecs
import json
import random import random
import socket import socket
import struct
import threading import threading
import time import time
from datetime import datetime from datetime import datetime
@ -48,7 +50,8 @@ def gendevice(devtype, host, mac):
], ],
hysen: [0x4EAD], # Hysen controller hysen: [0x4EAD], # Hysen controller
S1C: [0x2722], # S1 (SmartOne Alarm Kit) S1C: [0x2722], # S1 (SmartOne Alarm Kit)
dooya: [0x4E4D] # Dooya DT360E (DOOYA_CURTAIN_V2) dooya: [0x4E4D], # Dooya DT360E (DOOYA_CURTAIN_V2)
bg1: [0x51E3] # BG Electrical Smart Power Socket
} }
# Look for the class associated to devtype in devices # Look for the class associated to devtype in devices
@ -349,6 +352,75 @@ class mp1(device):
return data return data
class bg1(device):
def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype)
self.type = "BG1"
def get_state(self):
"""Get state of device.
Returns:
dict: Dictionary of current state
eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`"""
packet = self._encode(1, b'{}')
response = self.send_packet(0x6a, packet)
return self._decode(response)
def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None):
data = {}
if pwr is not None:
data['pwr'] = int(bool(pwr))
if pwr1 is not None:
data['pwr1'] = int(bool(pwr1))
if pwr2 is not None:
data['pwr2'] = int(bool(pwr2))
if maxworktime is not None:
data['maxworktime'] = maxworktime
if maxworktime1 is not None:
data['maxworktime1'] = maxworktime1
if maxworktime2 is not None:
data['maxworktime2'] = maxworktime2
if idcbrightness is not None:
data['idcbrightness'] = idcbrightness
js = json.dumps(data).encode('utf8')
packet = self._encode(2, js)
response = self.send_packet(0x6a, packet)
return self._decode(response)
def _encode(self, flag, js):
# packet format is:
# 0x00-0x01 length
# 0x02-0x05 header
# 0x06-0x07 00
# 0x08 flag (1 for read or 2 write?)
# 0x09 unknown (0xb)
# 0x0a-0x0d length of json
# 0x0e- json data
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(js)
struct.pack_into('<HHHHBBI', packet, 0, length, 0xa5a5, 0x5a5a, 0x0000, flag, 0x0b, len(js))
for i in range(len(js)):
packet.append(js[i])
checksum = 0xc0ad
for c in packet[0x08:]:
checksum = (checksum + c) & 0xffff
packet[0x06] = checksum & 0xff
packet[0x07] = checksum >> 8
return packet
def _decode(self, response):
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:]))
js_len = struct.unpack_from('<I', payload, 0x0a)[0]
state = json.loads(payload[0x0e:0x0e+js_len])
return state
class sp1(device): class sp1(device):
def __init__(self, host, mac, devtype): def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype) device.__init__(self, host, mac, devtype)