Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .cover import dooya
from .device import device, ping, scan
from .exceptions import exception
from .light import lb1
from .light import lb1, lb27r1
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
Expand Down Expand Up @@ -103,6 +103,7 @@
0x60C7: (lb1, "LB1", "Broadlink"),
0x60C8: (lb1, "LB1", "Broadlink"),
0x6112: (lb1, "LB1", "Broadlink"),
0xA4F4: (lb27r1, "LB27 R1", "Broadlink"),
0x2722: (S1C, "S2KIT", "Broadlink"),
0x4EAD: (hysen, "HY02B05H", "Hysen"),
0x4E4D: (dooya, "DT360E-45/20", "Dooya"),
Expand Down
119 changes: 106 additions & 13 deletions broadlink/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import enum
import json
import struct
import typing
import typing as t

from .device import device
from .exceptions import check_error
Expand Down Expand Up @@ -58,32 +58,32 @@ def set_state(
if green is not None:
state["green"] = int(green)
if brightness is not None:
state["brightness"] = brightness
state["brightness"] = int(brightness)
if colortemp is not None:
state["colortemp"] = colortemp
state["colortemp"] = int(colortemp)
if hue is not None:
state["hue"] = hue
state["hue"] = int(hue)
if saturation is not None:
state["saturation"] = saturation
state["saturation"] = int(saturation)
if transitionduration is not None:
state["transitionduration"] = transitionduration
state["transitionduration"] = int(transitionduration)
if maxworktime is not None:
state["maxworktime"] = maxworktime
state["maxworktime"] = int(maxworktime)
if bulb_colormode is not None:
state["bulb_colormode"] = bulb_colormode
state["bulb_colormode"] = int(bulb_colormode)
if bulb_scenes is not None:
state["bulb_scenes"] = bulb_scenes
state["bulb_scenes"] = str(bulb_scenes)
if bulb_scene is not None:
state["bulb_scene"] = bulb_scene
state["bulb_scene"] = str(bulb_scene)
if bulb_sceneidx is not None:
state["bulb_sceneidx"] = bulb_sceneidx
state["bulb_sceneidx"] = int(bulb_sceneidx)

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, obj: typing.Any) -> bytes:
def _encode(self, flag: int, obj: t.Any) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(14)
Expand All @@ -97,9 +97,102 @@ def _encode(self, flag: int, obj: typing.Any) -> bytes:
packet[0x6:0x8] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> typing.Any:
def _decode(self, response: bytes) -> t.Any:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE : 0xE + js_len])
return state


class lb27r1(device):
"""Controls a Broadlink LB27 R1."""

TYPE = "LB27R1"

@enum.unique
class ColorMode(enum.IntEnum):
"""Enumerates color modes."""
RGB = 0
WHITE = 1
SCENE = 2

def get_state(self) -> dict:
"""Return the power state of the device.

Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if red is not None:
state["red"] = int(red)
if blue is not None:
state["blue"] = int(blue)
if green is not None:
state["green"] = int(green)
if brightness is not None:
state["brightness"] = int(brightness)
if colortemp is not None:
state["colortemp"] = int(colortemp)
if hue is not None:
state["hue"] = int(hue)
if saturation is not None:
state["saturation"] = int(saturation)
if transitionduration is not None:
state["transitionduration"] = int(transitionduration)
if maxworktime is not None:
state["maxworktime"] = int(maxworktime)
if bulb_colormode is not None:
state["bulb_colormode"] = int(bulb_colormode)
if bulb_scenes is not None:
state["bulb_scenes"] = str(bulb_scenes)
if bulb_scene is not None:
state["bulb_scene"] = str(bulb_scene)

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, obj: t.Any) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
js = json.dumps(obj, separators=[',', ':']).encode()
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
)
packet += js
checksum = sum(packet[0x6:], 0xC0AD) & 0xFFFF
packet[0x4:0x6] = checksum.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> t.Any:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x8)[0]
state = json.loads(payload[0xC : 0xC + js_len])
return state