diff --git a/README.md b/README.md index 74ed0f45..f84b339f 100644 --- a/README.md +++ b/README.md @@ -54,10 +54,11 @@ will take care of advertising it on the local network, setting a HAP server and running the Accessory. Take a look at [main.py](main.py) for a quick start on that. ```python -from pyhap.accessory import Accessory, Category +from pyhap.accessory import Accessory, AsyncAccessory, Category import pyhap.loader as loader -class TemperatureSensor(Accessory): +### Async accessory - run method is run asynchronously in the event loop +class TemperatureSensor(AsyncAccessory): """Implementation of a mock temperature sensor accessory.""" category = Category.SENSOR # This is for the icon in the iOS Home app. @@ -98,25 +99,31 @@ class TemperatureSensor(Accessory): temp_sensor_service = loader.get_serv_loader().get("TemperatureSensor") self.add_service(temp_sensor_service) - def run(self): + @AsyncAcessory.run_at_interval(3) # Run this method every 3 seconds + async def run(self): """We override this method to implement what the accessory will do when it is started. An accessory is started and stopped from the AccessoryDriver. - It might be convenient to use the Accessory's run_sentinel, which is a - threading.Event object which is set when the accessory should stop running. - - In this example, we wait 3 seconds to see if the run_sentinel will be set and if - not, we set the current temperature to a random number. + We set the current temperature to a random number. The decorator runs this method + every 3 seconds. """ - while not self.run_sentinel.wait(3): - self.temp_char.set_value(random.randint(18, 26)) + self.temp_char.set_value(random.randint(18, 26)) def stop(self): """We override this method to clean up any resources or perform final actions, as - this is called by the AccessoryDriver when the Accessory is being stopped (it is - called right after run_sentinel is set). + this is called by the AccessoryDriver when the Accessory is being stopped. """ print("Stopping accessory.") + +### Synchronouse accessory - run method is in a thread +class SyncTemperatureSensor(Accessory): + """Everything is same as in the TemperatureSensor, apart from the run method which is + not async. + """ + + @Accessory.run_at_interval(3) + def run(self): + self.temp_char.set_value(random.randint(18, 26)) ``` ## Integrating non-compatible devices diff --git a/main.py b/main.py index 2dc28bf2..3b22eb02 100755 --- a/main.py +++ b/main.py @@ -7,10 +7,13 @@ """ import logging import signal +import time +import random from pyhap.accessories.TemperatureSensor import TemperatureSensor from pyhap.accessory import Bridge from pyhap.accessory_driver import AccessoryDriver +import pyhap.loader as loader logging.basicConfig(level=logging.INFO) @@ -18,8 +21,8 @@ def get_bridge(): """Call this method to get a Bridge instead of a standalone accessory.""" bridge = Bridge(display_name="Bridge") - temp_sensor = TemperatureSensor("Termometer") - temp_sensor2 = TemperatureSensor("Termometer2") + temp_sensor = TemperatureSensor("Sensor 2") + temp_sensor2 = TemperatureSensor("Sensor 1") bridge.add_accessory(temp_sensor) bridge.add_accessory(temp_sensor2) diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index 9ec518b4..27c881b1 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -1,13 +1,13 @@ # An Accessory mocking a temperature sensor. # It changes its value every few seconds. +import asyncio import random import time -from pyhap.accessory import Accessory, Category +from pyhap.accessory import AsyncAccessory, Category import pyhap.loader as loader - -class TemperatureSensor(Accessory): +class TemperatureSensor(AsyncAccessory): category = Category.SENSOR @@ -22,6 +22,6 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - def run(self): - while not self.run_sentinel.wait(3): - self.temp_char.set_value(random.randint(18, 26)) + @AsyncAccessory.run_at_interval(3) + async def run(self): + self.temp_char.set_value(random.randint(18, 26)) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index 54a71931..e3dd8aa3 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -1,3 +1,4 @@ +import asyncio import threading import logging import itertools @@ -8,7 +9,7 @@ import base36 from pyqrcode import QRCode -import pyhap.util as util +from pyhap import util from pyhap.loader import get_serv_loader logger = logging.getLogger(__name__) @@ -155,6 +156,8 @@ def __init__(self, display_name, aid=None, mac=None, pincode=None, self.broker = None # threading.Event that gets set when the Accessory should stop. self.run_sentinel = None + self.event_loop = None + self.aio_stop_event = None sk, vk = ed25519.create_keypair() self.private_key = sk @@ -211,7 +214,7 @@ def _set_services(self): # FIXME: Need to ensure AccessoryInformation is with IID 1. self.add_service(info_service) - def set_sentinel(self, run_sentinel): + def set_sentinel(self, run_sentinel, aio_stop_event, event_loop): """Assign a run sentinel that can signal stopping. The run sentinel is a threading.Event object that can be used to manage @@ -224,6 +227,8 @@ def set_sentinel(self, run_sentinel): ... sensor.readTemperature() """ self.run_sentinel = run_sentinel + self.aio_stop_event = aio_stop_event + self.event_loop = event_loop def config_changed(self): """Notify the accessory about configuration changes. @@ -376,6 +381,28 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) + def run_at_interval(seconds): + """Decorator that runs decorated method in a while loop, which repeats every + ``seconds`` until the ``Accessory.run_sentinel`` is set. + + .. code-block:: python + + @Accessory.run_at_interval(3) + def run(self): + print("Hello again world!") + + :param seconds: The amount of seconds to wait for the event to be set. + Determines the interval on which the decorated method will be called. + :type seconds: float + """ + # decorator returns a decorator with the argument it got + def _repeat(func): + def _wrapper(self, *args, **kwargs): + while not self.run_sentinel.wait(seconds): + func(self, *args, **kwargs) + return _wrapper + return _repeat + def run(self): """Called when the Accessory should start doing its thing. @@ -414,7 +441,39 @@ def publish(self, data, sender): self.broker.publish(acc_data) -class Bridge(Accessory): +class AsyncAccessory(Accessory): + + def run_at_interval(seconds): + """Decorator that runs decorated method in a while loop, which repeats every + ``seconds`` until the ``Accessory.aio_stop_event`` is set. + + .. code-block:: python + + @AsyncAccessory.run_at_interval(3) + async def run(self): + print("Hello again world!") + + :param seconds: The amount of seconds to wait for the event to be set. + Determines the interval on which the decorated method will be called. + :type seconds: float + """ + # decorator returns a decorator with the argument it got + def _repeat(func): + async def _wrapper(self, *args, **kwargs): + while not await util.event_wait(self.aio_stop_event, + seconds, + self.event_loop): + await func(self, *args, **kwargs) + return _wrapper + return _repeat + + async def run(self): + """Override in the implementation if needed. + """ + pass + + +class Bridge(AsyncAccessory): """A representation of a HAP bridge. A `Bridge` can have multiple `Accessories`. @@ -433,11 +492,11 @@ def __init__(self, display_name, mac=None, pincode=None, setup_id=setup_id) self.accessories = {} # aid: acc - def set_sentinel(self, run_sentinel): + def set_sentinel(self, run_sentinel, aio_stop_event, event_loop): """Set the same sentinel to all contained accessories.""" - super(Bridge, self).set_sentinel(run_sentinel) + super().set_sentinel(run_sentinel, aio_stop_event, event_loop) for acc in self.accessories.values(): - acc.set_sentinel(run_sentinel) + acc.set_sentinel(run_sentinel, aio_stop_event, event_loop) def add_accessory(self, acc): """Add the given ``Accessory`` to this ``Bridge``. @@ -498,12 +557,24 @@ def get_characteristic(self, aid, iid): return acc.get_characteristic(aid, iid) - def run(self): - """Creates and starts a new thread for each of the contained accessories' run - method. + async def _wrap_in_thread(self, method): + """Coroutine which starts the given method in a thread. + """ + # Not going through event_loop.run_in_executor, because this thread may never + # terminate. + threading.Thread(target=method).start() + + async def run(self): + """Schedule tasks for each of the accessories' run method. """ + tasks = [] for acc in self.accessories.values(): - threading.Thread(target=acc.run).start() + if isinstance(acc, AsyncAccessory): + task = self.event_loop.create_task(acc.run()) + else: + task = self.event_loop.create_task(self._wrap_in_thread(acc.run)) + tasks.append(task) + await asyncio.gather(*tasks, loop=self.event_loop) def stop(self): """Calls stop() on all contained accessories.""" diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index 18a4009e..3d3582a5 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -21,6 +21,7 @@ or went to sleep before telling us. This concludes the publishing process from the AccessoryDriver. """ +import asyncio import os import logging import socket @@ -34,7 +35,7 @@ from zeroconf import ServiceInfo, Zeroconf -from pyhap.accessory import get_topic, STANDALONE_AID +from pyhap.accessory import AsyncAccessory, get_topic, STANDALONE_AID from pyhap.characteristic import CharacteristicError from pyhap.params import get_srp_context from pyhap.hsrp import Server as SrpServer @@ -156,6 +157,9 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state" self.persist() self.topics = {} # topic: set of (address, port) of subscribed clients self.topic_lock = threading.Lock() # for exclusive access to the topics + self.event_loop = asyncio.get_event_loop() + self.aio_stop_event = asyncio.Event() + self.stop_event = threading.Event() self.event_queue = queue.Queue() # (topic, bytes) self.send_event_thread = None # the event dispatch thread self.sent_events = 0 @@ -164,7 +168,6 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state" self.accessory.set_broker(self) self.mdns_service_info = None self.srp_verifier = None - self.run_sentinel = None self.accessory_thread = None def subscribe_client_topic(self, client, topic, subscribe=True): @@ -228,7 +231,7 @@ def send_events(self): this is not run in a daemon thread or it is run on the main thread, the app will hang. """ - while not self.run_sentinel.is_set(): + while not self.event_loop.is_closed(): # Maybe consider having a pool of worker threads, each performing a send in # order to increase throughput. topic, bytedata = self.event_queue.get() @@ -462,12 +465,6 @@ def start(self): logger.info("Starting accessory '%s' on address '%s', port '%s'.", self.accessory.display_name, self.address, self.port) - # Start the accessory so it can do stuff. - self.run_sentinel = threading.Event() - self.accessory.set_sentinel(self.run_sentinel) - self.accessory_thread = threading.Thread(target=self.accessory.run) - self.accessory_thread.start() - # Start sending events to clients. This is done in a daemon thread, because: # - if the queue is blocked waiting on an empty queue, then there is nothing left # for clean up. @@ -492,6 +489,18 @@ def start(self): if not self.accessory.paired: self.accessory.setup_message() + # Start the accessory so it can do stuff. + self.accessory.set_sentinel(self.stop_event, self.aio_stop_event, self.event_loop) + if isinstance(self.accessory, AsyncAccessory): + self.accessory_task = self.event_loop.create_task(self.accessory.run()) + else: + self.accessory_task = self.event_loop.run_in_executor(None, + self.accessory.run) + logger.info("Starting event loop") + self.event_loop.run_until_complete(self.accessory_task) + self.event_loop.close() + logger.info("Stopped event loop.") + def stop(self): """Stop the accessory. @@ -504,10 +513,11 @@ def stop(self): # to ensure that sending with a closed server will not crash the app. logger.info("Stoping accessory '%s' on address %s, port %s.", self.accessory.display_name, self.address, self.port) - logger.debug("Setting run sentinel, stopping accessory and event sending") - self.run_sentinel.set() + logger.debug("Setting stop events, stopping accessory and event sending") + self.stop_event.set() + if not self.event_loop.is_closed(): + self.event_loop.call_soon_threadsafe(self.aio_stop_event.set) self.accessory.stop() - self.accessory_thread.join() logger.debug("Stopping mDNS advertising") self.advertiser.unregister_service(self.mdns_service_info) diff --git a/pyhap/util.py b/pyhap/util.py index 93f7234c..8d9d3bad 100644 --- a/pyhap/util.py +++ b/pyhap/util.py @@ -1,3 +1,4 @@ +import asyncio import socket import random import binascii @@ -113,3 +114,21 @@ def hex2b(hex): fromhex = bytes.fromhex if sys.version_info >= (3, 5) else hex2b """Python-version-agnostic fromhex function. Equivalent to bytes.fromhex in python 3.5+. """ + +async def event_wait(event, timeout, loop=None): + """Wait for the given event to be set or for the timeout to expire. + + :param event: The event to wait for. + :type event: asyncio.Event + + :param timeout: The timeout for which to wait, in seconds. + :type timeout: float + + :return: ``event.is_set()`` + :rtype: bool + """ + try: + await asyncio.wait_for(event.wait(), timeout, loop=loop) + except asyncio.TimeoutError: + pass + return event.is_set() \ No newline at end of file diff --git a/tests/test_accessory_driver.py b/tests/test_accessory_driver.py index 1f8d9a42..6b611c74 100644 --- a/tests/test_accessory_driver.py +++ b/tests/test_accessory_driver.py @@ -1,6 +1,7 @@ """ Tests for pyhap.accessory_driver """ +import asyncio import os import tempfile from unittest.mock import patch, Mock @@ -8,7 +9,7 @@ import pytest from pyhap.accessory import (Accessory, - Bridge, + AsyncAccessory, STANDALONE_AID) from pyhap.accessory_driver import AccessoryDriver @@ -47,3 +48,42 @@ def get_acc(): assert driver.accessory.public_key == pk finally: os.remove(persist_file) + + +@patch("pyhap.accessory_driver.asyncio.get_event_loop", + new=Mock(side_effect=asyncio.new_event_loop)) +@patch("pyhap.accessory_driver.Zeroconf", new=Mock()) +@patch("pyhap.accessory_driver.AccessoryDriver.persist") +@patch("pyhap.accessory_driver.HAPServer", new=Mock()) +def test_start_stop_sync_acc(_persist): + class Acc(Accessory): + running = True + def run(self): + while self.run_sentinel.wait(2): + pass + self.running = False + def setup_message(self): pass + + acc = Acc("TestAcc") + driver = AccessoryDriver(acc, 51234, persist_file="foo") + driver.start() + driver.stop() + assert not acc.running + + +#TODO: This test is failing when there is no patch for the get_event_loop. +# Something is getting the default loop and not the new loop +@patch("pyhap.accessory_driver.Zeroconf", new=Mock()) +@patch("pyhap.accessory_driver.AccessoryDriver.persist") +@patch("pyhap.accessory_driver.HAPServer", new=Mock()) +def test_start_stop_async_acc(_persist): + class Acc(AsyncAccessory): + @AsyncAccessory.run_at_interval(2) + async def run(self): + driver.stop() + def setup_message(self): pass + + acc = Acc("TestAcc") + driver = AccessoryDriver(acc, 51234, persist_file="foo") + driver.start() + assert driver.event_loop.is_closed() \ No newline at end of file diff --git a/tox.ini b/tox.ini index 3df6605d..99660e16 100644 --- a/tox.ini +++ b/tox.ini @@ -1,9 +1,8 @@ [tox] -envlist = py34,py35,py36,docs +envlist = py35,py36,docs skip_missing_interpreters = True [tox:travis] -3.4 = py34 3.5 = py35 3.6 = py36, docs