Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions broadlink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#!/usr/bin/env python3
"""The python-broadlink library."""
import socket
from typing import Generator, List, Tuple, Union
import typing as t

from . import exceptions as e
from .alarm import S1C
from .climate import hysen
from .cover import dooya
from .device import device, ping, scan
from .exceptions import exception
from .light import lb1, lb27r1
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
Expand Down Expand Up @@ -115,8 +115,8 @@

def gendevice(
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
name: str = None,
is_locked: bool = None,
) -> device:
Expand Down Expand Up @@ -151,15 +151,15 @@ def hello(
try:
return next(xdiscover(timeout, local_ip_address, host, port))
except StopIteration:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.


def discover(
timeout: int = 10,
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> List[device]:
) -> t.List[device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
return [gendevice(*resp) for resp in responses]
Expand All @@ -170,7 +170,7 @@ def xdiscover(
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[device, None, None]:
) -> t.Generator[device, None, None]:
"""Discover devices connected to the local network.

This function returns a generator that yields devices instantly.
Expand Down
4 changes: 2 additions & 2 deletions broadlink/alarm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Support for alarm kits."""
from . import exceptions as e
from .device import device
from .exceptions import check_error


class S1C(device):
Expand All @@ -19,7 +19,7 @@ def get_sensors_status(self) -> dict:
packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if not payload:
return None
Expand Down
4 changes: 2 additions & 2 deletions broadlink/climate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Support for climate control."""
from typing import List

from . import exceptions as e
from .device import device
from .exceptions import check_error
from .helpers import calculate_crc16


Expand Down Expand Up @@ -31,7 +31,7 @@ def send_request(self, input_payload: bytes) -> bytes:

# send to device
response = self.send_packet(0x6A, request_payload)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
response_payload = self.decrypt(response[0x38:])

# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
Expand Down
4 changes: 2 additions & 2 deletions broadlink/cover.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Support for covers."""
import time

from . import exceptions as e
from .device import device
from .exceptions import check_error


class dooya(device):
Expand All @@ -20,7 +20,7 @@ def _send(self, magic1: int, magic2: int) -> int:
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[4]

Expand Down
30 changes: 15 additions & 15 deletions broadlink/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@
import threading
import random
import time
from typing import Generator, Tuple, Union
import typing as t

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from .exceptions import check_error, exception
from . import exceptions as e
from .protocol import Datetime

HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]


def scan(
timeout: int = 10,
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[HelloResponse, None, None]:
) -> t.Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand Down Expand Up @@ -94,8 +94,8 @@ class device:

def __init__(
self,
host: Tuple[str, int],
mac: Union[bytes, str],
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
devtype: int,
timeout: int = 10,
name: str = "",
Expand Down Expand Up @@ -184,7 +184,7 @@ def auth(self) -> bool:
payload[0x30:0x36] = "Test 1".encode()

response = self.send_packet(0x65, payload)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])

key = payload[0x04:0x14]
Expand All @@ -209,10 +209,10 @@ def hello(self, local_ip_address=None) -> bool:
try:
devtype, host, mac, name, is_locked = next(responses)
except StopIteration:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.

if (devtype, host, mac) != (self.devtype, self.host, self.mac):
raise exception(-2040) # Device information is not intact.
raise e.exception(-2040) # Device information is not intact.

self.name = name
self.is_locked = is_locked
Expand All @@ -231,7 +231,7 @@ def get_fwversion(self) -> int:
"""Get firmware version."""
packet = bytearray([0x68])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x4] | payload[0x5] << 8

Expand All @@ -242,7 +242,7 @@ def set_name(self, name: str) -> None:
packet += bytearray(0x50 - len(packet))
packet[0x43] = self.is_locked
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.name = name

def set_lock(self, state: bool) -> None:
Expand All @@ -252,7 +252,7 @@ def set_lock(self, state: bool) -> None:
packet += bytearray(0x50 - len(packet))
packet[0x43] = bool(state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.is_locked = bool(state)

def get_type(self) -> str:
Expand Down Expand Up @@ -294,13 +294,13 @@ def send_packet(self, packet_type: int, payload: bytes) -> bytes:
break
except socket.timeout:
if (time.time() - start_time) > timeout:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.

if len(resp) < 0x30:
raise exception(-4007) # Length error.
raise e.exception(-4007) # Length error.

checksum = int.from_bytes(resp[0x20:0x22], "little")
if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum:
raise exception(-4008) # Checksum error.
raise e.exception(-4008) # Checksum error.

return resp
10 changes: 5 additions & 5 deletions broadlink/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import struct
import typing as t

from . import exceptions as e
from .device import device
from .exceptions import check_error


class lb1(device):
Expand All @@ -27,7 +27,7 @@ def get_state(self) -> dict:
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
Expand Down Expand Up @@ -80,7 +80,7 @@ def set_state(

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, obj: t.Any) -> bytes:
Expand Down Expand Up @@ -124,7 +124,7 @@ def get_state(self) -> dict:
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)

def set_state(
Expand Down Expand Up @@ -174,7 +174,7 @@ def set_state(

packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)

def _encode(self, flag: int, obj: t.Any) -> bytes:
Expand Down
6 changes: 3 additions & 3 deletions broadlink/remote.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Support for universal remotes."""
import struct

from . import exceptions as e
from .device import device
from .exceptions import check_error


class rmmini(device):
Expand All @@ -14,7 +14,7 @@ def _send(self, command: int, data: bytes = b'') -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<I", command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[0x4:]

Expand Down Expand Up @@ -79,7 +79,7 @@ def _send(self, command: int, data: bytes = b'') -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<HI", len(data) + 4, command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6:p_len+2]
Expand Down
4 changes: 2 additions & 2 deletions broadlink/sensor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Support for sensors."""
import struct

from . import exceptions as e
from .device import device
from .exceptions import check_error


class a1(device):
Expand Down Expand Up @@ -30,7 +30,7 @@ def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]

Expand Down
Loading