From cef28488f8fead9e5dfdd294989bfbe557996db4 Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Sat, 31 Mar 2018 17:54:40 +0300 Subject: [PATCH 1/6] Move to coroutines for Accessory.run Problems to solve: - Support non-coroutine run methods - not all code could switch to asyncio, these should be started in a thread as before. - Port accessories/* --- pyhap/accessories/TemperatureSensor.py | 7 +++-- pyhap/accessory.py | 13 ++++----- pyhap/accessory_driver.py | 37 ++++++++++++++++++++++---- tox.ini | 3 +-- 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index 9ec518b4..c402fd43 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -1,5 +1,6 @@ # An Accessory mocking a temperature sensor. # It changes its value every few seconds. +import asyncio import random import time @@ -22,6 +23,8 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - def run(self): - while not self.run_sentinel.wait(3): + async def run(self, loop, stop_event): + while not stop_event.is_set(): + await asyncio.sleep(3) self.temp_char.set_value(random.randint(18, 26)) + print(self.display_name, self.temp_char.value) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index 54a71931..9f7f06ef 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -1,3 +1,4 @@ +import asyncio import threading import logging import itertools @@ -376,7 +377,7 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) - def run(self): + async def run(self, loop, stop_event): """Called when the Accessory should start doing its thing. Called when HAP server is running, advertising is set, etc. @@ -498,12 +499,12 @@ def get_characteristic(self, aid, iid): return acc.get_characteristic(aid, iid) - def run(self): - """Creates and starts a new thread for each of the contained accessories' run - method. + async def run(self, loop, stop_event): + """Schedule tasks for each of the accessories' run method. """ - for acc in self.accessories.values(): - threading.Thread(target=acc.run).start() + all_accessory_tasks = (acc.run(loop, stop_event) + for acc in self.accessories.values()) + await asyncio.gather(*all_accessory_tasks, loop=loop) def stop(self): """Calls stop() on all contained accessories.""" diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index 18a4009e..84ac4df0 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -21,6 +21,9 @@ or went to sleep before telling us. This concludes the publishing process from the AccessoryDriver. """ +import asyncio +from concurrent.futures import CancelledError +import functools import os import logging import socket @@ -96,6 +99,32 @@ class HAP_CONSTANTS: SERVICE_COMMUNICATION_FAILURE = -70402 +class AIOThread(threading.Thread): + """ + TODO: temporary convenience class. Create an event loop and event object for + controlling the Accessory.run method. + """ + + def __init__(self, run_method): + """ + """ + self.loop = asyncio.new_event_loop() + self.stop_event = asyncio.Event() + self.task = self.loop.create_task(run_method(self.loop, self.stop_event)) + super(AIOThread, self).__init__(target=self.loop.run_until_complete, + args=(self.task,)) + + def run(self): + try: + super(AIOThread, self).run() + except CancelledError: + self.loop.stop() + self.loop.close() + logger.info("Sucessfully stopped accessory event loop.") + + def stop(self): + self.loop.call_soon_threadsafe(self.task.cancel) + class AccessoryDriver(object): """ An AccessoryDriver mediates between incoming requests from the HAPServer and @@ -228,7 +257,7 @@ def send_events(self): this is not run in a daemon thread or it is run on the main thread, the app will hang. """ - while not self.run_sentinel.is_set(): + while not self.accessory_thread.is_alive(): # self.run_sentinel.is_set(): # Maybe consider having a pool of worker threads, each performing a send in # order to increase throughput. topic, bytedata = self.event_queue.get() @@ -463,9 +492,7 @@ def start(self): self.accessory.display_name, self.address, self.port) # Start the accessory so it can do stuff. - self.run_sentinel = threading.Event() - self.accessory.set_sentinel(self.run_sentinel) - self.accessory_thread = threading.Thread(target=self.accessory.run) + self.accessory_thread = AIOThread(self.accessory.run) self.accessory_thread.start() # Start sending events to clients. This is done in a daemon thread, because: @@ -505,8 +532,8 @@ def stop(self): logger.info("Stoping accessory '%s' on address %s, port %s.", self.accessory.display_name, self.address, self.port) logger.debug("Setting run sentinel, stopping accessory and event sending") - self.run_sentinel.set() self.accessory.stop() + self.accessory_thread.stop() self.accessory_thread.join() logger.debug("Stopping mDNS advertising") diff --git a/tox.ini b/tox.ini index 3df6605d..99660e16 100644 --- a/tox.ini +++ b/tox.ini @@ -1,9 +1,8 @@ [tox] -envlist = py34,py35,py36,docs +envlist = py35,py36,docs skip_missing_interpreters = True [tox:travis] -3.4 = py34 3.5 = py35 3.6 = py36, docs From 06c95476b117f3d116856eed9c891d39753ff09d Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Sat, 31 Mar 2018 23:38:49 +0300 Subject: [PATCH 2/6] Add support for non-async run methods. AccessoryDriver.stop still fails, because the stop_event is an asyncio.Event being set from another thread. --- main.py | 33 ++++++++++++++++++++++---- pyhap/accessories/TemperatureSensor.py | 2 +- pyhap/accessory.py | 24 +++++++++++++++---- pyhap/accessory_driver.py | 2 +- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/main.py b/main.py index 2dc28bf2..b1e48389 100755 --- a/main.py +++ b/main.py @@ -7,19 +7,44 @@ """ import logging import signal +import time +import random from pyhap.accessories.TemperatureSensor import TemperatureSensor -from pyhap.accessory import Bridge +from pyhap.accessory import Bridge, Accessory, Category from pyhap.accessory_driver import AccessoryDriver +import pyhap.loader as loader -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) + + +class SyncTemperatureSensor(Accessory): + + category = Category.SENSOR + + def __init__(self, *args, **kwargs): + super(SyncTemperatureSensor, self).__init__(*args, **kwargs) + + self.temp_char = self.get_service("TemperatureSensor")\ + .get_characteristic("CurrentTemperature") + + def _set_services(self): + super(SyncTemperatureSensor, self)._set_services() + self.add_service( + loader.get_serv_loader().get("TemperatureSensor")) + + def run(self, stop_event, loop=None): + while not stop_event.is_set(): # This is not being set because it is from another thread. + time.sleep(3) + self.temp_char.set_value(random.randint(18, 26)) + print(self.display_name, self.temp_char.value) def get_bridge(): """Call this method to get a Bridge instead of a standalone accessory.""" bridge = Bridge(display_name="Bridge") temp_sensor = TemperatureSensor("Termometer") - temp_sensor2 = TemperatureSensor("Termometer2") + temp_sensor2 = SyncTemperatureSensor("Termometer2") bridge.add_accessory(temp_sensor) bridge.add_accessory(temp_sensor2) @@ -36,7 +61,7 @@ def get_accessory(): return acc -acc = get_accessory() # Change to get_bridge() if you want to run a Bridge. +acc = get_bridge() # Change to get_bridge() if you want to run a Bridge. # Start the accessory on port 51826 driver = AccessoryDriver(acc, port=51826) diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index c402fd43..aec1734f 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -23,7 +23,7 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - async def run(self, loop, stop_event): + async def run(self, stop_event, loop=None): while not stop_event.is_set(): await asyncio.sleep(3) self.temp_char.set_value(random.randint(18, 26)) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index 9f7f06ef..ff79853f 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -377,7 +377,7 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) - async def run(self, loop, stop_event): + async def run(self, stop_event, loop=None): """Called when the Accessory should start doing its thing. Called when HAP server is running, advertising is set, etc. @@ -499,12 +499,26 @@ def get_characteristic(self, aid, iid): return acc.get_characteristic(aid, iid) - async def run(self, loop, stop_event): + async def run(self, stop_event, loop=None): """Schedule tasks for each of the accessories' run method. """ - all_accessory_tasks = (acc.run(loop, stop_event) - for acc in self.accessories.values()) - await asyncio.gather(*all_accessory_tasks, loop=loop) + coroutines = [] # Accessories with async run method + syncs = [] # Accessories with non-async run method + + for acc in self.accessories.values(): + if asyncio.iscoroutinefunction(acc.run): + coroutines.append(acc) + else: + syncs.append(acc) + + logger.debug("Coroutines: %s", coroutines) + logger.debug("Synchronous: %s", syncs) + + for acc in syncs: + threading.Thread(target=acc.run, args=(stop_event,)).start() + + accessory_futures = (acc.run(stop_event, loop) for acc in coroutines) + await asyncio.gather(*accessory_futures, loop=loop) def stop(self): """Calls stop() on all contained accessories.""" diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index 84ac4df0..d7fe14f4 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -110,7 +110,7 @@ def __init__(self, run_method): """ self.loop = asyncio.new_event_loop() self.stop_event = asyncio.Event() - self.task = self.loop.create_task(run_method(self.loop, self.stop_event)) + self.task = self.loop.create_task(run_method(self.stop_event, self.loop)) super(AIOThread, self).__init__(target=self.loop.run_until_complete, args=(self.task,)) From 9f9d7aa636aafec7279ed48b30b42e47490d3e33 Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Wed, 4 Apr 2018 09:14:24 +0300 Subject: [PATCH 3/6] Allow for both sync and async run method. This change moves to a cleaner use of asyncio. Introduced Accessory.start which creates a task for Accessory.run. The idea is to maintain compatibility with clients that cannot support coroutines and to allow Accessories to be run both as a thread and as a coroutine. This is still work in progress. --- main.py | 9 ++-- pyhap/accessories/TemperatureSensor.py | 10 ++-- pyhap/accessory.py | 66 ++++++++++++++++---------- pyhap/accessory_driver.py | 55 +++++++-------------- pyhap/util.py | 19 ++++++++ 5 files changed, 87 insertions(+), 72 deletions(-) diff --git a/main.py b/main.py index b1e48389..60865b0a 100755 --- a/main.py +++ b/main.py @@ -33,9 +33,8 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - def run(self, stop_event, loop=None): - while not stop_event.is_set(): # This is not being set because it is from another thread. - time.sleep(3) + def run(self): + while not self.run_sentinel.wait(3): self.temp_char.set_value(random.randint(18, 26)) print(self.display_name, self.temp_char.value) @@ -43,8 +42,8 @@ def run(self, stop_event, loop=None): def get_bridge(): """Call this method to get a Bridge instead of a standalone accessory.""" bridge = Bridge(display_name="Bridge") - temp_sensor = TemperatureSensor("Termometer") - temp_sensor2 = SyncTemperatureSensor("Termometer2") + temp_sensor = TemperatureSensor("Aio") + temp_sensor2 = SyncTemperatureSensor("Synchronised") bridge.add_accessory(temp_sensor) bridge.add_accessory(temp_sensor2) diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index aec1734f..0dfe44e3 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -7,7 +7,6 @@ from pyhap.accessory import Accessory, Category import pyhap.loader as loader - class TemperatureSensor(Accessory): category = Category.SENSOR @@ -23,8 +22,7 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - async def run(self, stop_event, loop=None): - while not stop_event.is_set(): - await asyncio.sleep(3) - self.temp_char.set_value(random.randint(18, 26)) - print(self.display_name, self.temp_char.value) + @Accessory.repeat(3) + async def run(self): + self.temp_char.set_value(random.randint(18, 26)) + print(self.display_name, self.temp_char.value) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index ff79853f..dcbc1783 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -9,7 +9,7 @@ import base36 from pyqrcode import QRCode -import pyhap.util as util +from pyhap import util from pyhap.loader import get_serv_loader logger = logging.getLogger(__name__) @@ -156,6 +156,8 @@ def __init__(self, display_name, aid=None, mac=None, pincode=None, self.broker = None # threading.Event that gets set when the Accessory should stop. self.run_sentinel = None + self.event_loop = None + self.aio_stop_event = None sk, vk = ed25519.create_keypair() self.private_key = sk @@ -212,7 +214,7 @@ def _set_services(self): # FIXME: Need to ensure AccessoryInformation is with IID 1. self.add_service(info_service) - def set_sentinel(self, run_sentinel): + def set_sentinel(self, run_sentinel, aio_stop_event, event_loop): """Assign a run sentinel that can signal stopping. The run sentinel is a threading.Event object that can be used to manage @@ -225,6 +227,8 @@ def set_sentinel(self, run_sentinel): ... sensor.readTemperature() """ self.run_sentinel = run_sentinel + self.aio_stop_event = aio_stop_event + self.event_loop = event_loop def config_changed(self): """Notify the accessory about configuration changes. @@ -377,13 +381,42 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) - async def run(self, stop_event, loop=None): + def repeat(seconds): + """Decorator that runs decorated method in a while loop, which repeats every + ``seconds``. It ues the ``Accessory.aio_stop_event``. + + :param seconds: The amount of seconds to wait for the event to be set. + Determines the interval on which the decorated method will be called. + :type seconds: float + """ + def _repeat(func): + async def _wrapper(self, *args, **kwargs): + while not await util.event_wait(self.aio_stop_event, seconds): + await func(self, *args, **kwargs) + return _wrapper + return _repeat + + async def run(self): """Called when the Accessory should start doing its thing. Called when HAP server is running, advertising is set, etc. """ pass + async def _wrap_in_thread(self): + threading.Thread(target=self.run).start() + + async def start(self): + """Create and await on a task for ``Accessory.run`` + + If ``Accessory.run`` is not a coroutine, it will be wrapped in a task that + starts it in a new thread. + """ + if asyncio.iscoroutinefunction(self.run): + await self.event_loop.create_task(self.run()) + else: + await self.event_loop.create_task(self._wrap_in_thread()) + def stop(self): """Called when the Accessory should stop what is doing and clean up any resources. """ @@ -434,11 +467,11 @@ def __init__(self, display_name, mac=None, pincode=None, setup_id=setup_id) self.accessories = {} # aid: acc - def set_sentinel(self, run_sentinel): + def set_sentinel(self, run_sentinel, aio_stop_event, event_loop): """Set the same sentinel to all contained accessories.""" - super(Bridge, self).set_sentinel(run_sentinel) + super(Bridge, self).set_sentinel(run_sentinel, aio_stop_event, event_loop) for acc in self.accessories.values(): - acc.set_sentinel(run_sentinel) + acc.set_sentinel(run_sentinel, aio_stop_event, event_loop) def add_accessory(self, acc): """Add the given ``Accessory`` to this ``Bridge``. @@ -499,26 +532,11 @@ def get_characteristic(self, aid, iid): return acc.get_characteristic(aid, iid) - async def run(self, stop_event, loop=None): + async def start(self): """Schedule tasks for each of the accessories' run method. """ - coroutines = [] # Accessories with async run method - syncs = [] # Accessories with non-async run method - - for acc in self.accessories.values(): - if asyncio.iscoroutinefunction(acc.run): - coroutines.append(acc) - else: - syncs.append(acc) - - logger.debug("Coroutines: %s", coroutines) - logger.debug("Synchronous: %s", syncs) - - for acc in syncs: - threading.Thread(target=acc.run, args=(stop_event,)).start() - - accessory_futures = (acc.run(stop_event, loop) for acc in coroutines) - await asyncio.gather(*accessory_futures, loop=loop) + tasks = (acc.start() for acc in self.accessories.values()) + await asyncio.gather(*tasks, loop=self.event_loop) def stop(self): """Calls stop() on all contained accessories.""" diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index d7fe14f4..e730b943 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -22,7 +22,7 @@ AccessoryDriver. """ import asyncio -from concurrent.futures import CancelledError +from concurrent.futures import CancelledError, ThreadPoolExecutor import functools import os import logging @@ -99,32 +99,6 @@ class HAP_CONSTANTS: SERVICE_COMMUNICATION_FAILURE = -70402 -class AIOThread(threading.Thread): - """ - TODO: temporary convenience class. Create an event loop and event object for - controlling the Accessory.run method. - """ - - def __init__(self, run_method): - """ - """ - self.loop = asyncio.new_event_loop() - self.stop_event = asyncio.Event() - self.task = self.loop.create_task(run_method(self.stop_event, self.loop)) - super(AIOThread, self).__init__(target=self.loop.run_until_complete, - args=(self.task,)) - - def run(self): - try: - super(AIOThread, self).run() - except CancelledError: - self.loop.stop() - self.loop.close() - logger.info("Sucessfully stopped accessory event loop.") - - def stop(self): - self.loop.call_soon_threadsafe(self.task.cancel) - class AccessoryDriver(object): """ An AccessoryDriver mediates between incoming requests from the HAPServer and @@ -137,7 +111,7 @@ class AccessoryDriver(object): NUM_EVENTS_BEFORE_STATS = 100 def __init__(self, accessory, port, address=None, persist_file="accessory.state", - encoder=None): + encoder=None, max_acc_threads=3): """ :param accessory: The `Accessory` to be managed by this driver. The `Accessory` must have the standalone AID (`pyhap.accessory.STANDALONE_AID`). If the @@ -185,6 +159,9 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state" self.persist() self.topics = {} # topic: set of (address, port) of subscribed clients self.topic_lock = threading.Lock() # for exclusive access to the topics + self.event_loop = asyncio.get_event_loop() + self.aio_stop_event = asyncio.Event() + self.stop_event = threading.Event() self.event_queue = queue.Queue() # (topic, bytes) self.send_event_thread = None # the event dispatch thread self.sent_events = 0 @@ -193,7 +170,6 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state" self.accessory.set_broker(self) self.mdns_service_info = None self.srp_verifier = None - self.run_sentinel = None self.accessory_thread = None def subscribe_client_topic(self, client, topic, subscribe=True): @@ -257,7 +233,7 @@ def send_events(self): this is not run in a daemon thread or it is run on the main thread, the app will hang. """ - while not self.accessory_thread.is_alive(): # self.run_sentinel.is_set(): + while not self.event_loop.is_closed(): # Maybe consider having a pool of worker threads, each performing a send in # order to increase throughput. topic, bytedata = self.event_queue.get() @@ -491,10 +467,6 @@ def start(self): logger.info("Starting accessory '%s' on address '%s', port '%s'.", self.accessory.display_name, self.address, self.port) - # Start the accessory so it can do stuff. - self.accessory_thread = AIOThread(self.accessory.run) - self.accessory_thread.start() - # Start sending events to clients. This is done in a daemon thread, because: # - if the queue is blocked waiting on an empty queue, then there is nothing left # for clean up. @@ -519,6 +491,15 @@ def start(self): if not self.accessory.paired: self.accessory.setup_message() + # Start the accessory so it can do stuff. + self.accessory.set_sentinel(self.stop_event, self.aio_stop_event, self.event_loop) + self.accessory_task = self.event_loop.create_task(self.accessory.start()) + logger.info("Starting event loop") + self.event_loop.run_until_complete(self.accessory_task) + self.event_loop.stop() + self.event_loop.close() + logger.info("Stopped event loop.") + def stop(self): """Stop the accessory. @@ -531,10 +512,10 @@ def stop(self): # to ensure that sending with a closed server will not crash the app. logger.info("Stoping accessory '%s' on address %s, port %s.", self.accessory.display_name, self.address, self.port) - logger.debug("Setting run sentinel, stopping accessory and event sending") + logger.debug("Setting stop events, stopping accessory and event sending") + self.stop_event.set() + self.event_loop.call_soon_threadsafe(self.aio_stop_event.set) self.accessory.stop() - self.accessory_thread.stop() - self.accessory_thread.join() logger.debug("Stopping mDNS advertising") self.advertiser.unregister_service(self.mdns_service_info) diff --git a/pyhap/util.py b/pyhap/util.py index 93f7234c..587a54dc 100644 --- a/pyhap/util.py +++ b/pyhap/util.py @@ -1,3 +1,4 @@ +import asyncio import socket import random import binascii @@ -113,3 +114,21 @@ def hex2b(hex): fromhex = bytes.fromhex if sys.version_info >= (3, 5) else hex2b """Python-version-agnostic fromhex function. Equivalent to bytes.fromhex in python 3.5+. """ + +async def event_wait(event, timeout, loop=None): + """Wait for the given event to be set or for the timeout to expire. + + :param event: The event to wait for. + :type event: asyncio.Event + + :param timeout: The timeout for which to wait, in seconds. + :type timeout: float + + :return: ``event.is_set()`` + :rtype: bool + """ + try: + await asyncio.wait_for(event.wait(), timeout) + except asyncio.TimeoutError: + pass + return event.is_set() \ No newline at end of file From a8b0bb6802d28a8a7124e4364817576cf2960316 Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Wed, 4 Apr 2018 23:09:34 +0300 Subject: [PATCH 4/6] Introduce AsyncAccessory, tidying asyncio integ. --- main.py | 2 +- pyhap/accessories/TemperatureSensor.py | 6 +- pyhap/accessory.py | 85 +++++++++++++++----------- pyhap/accessory_driver.py | 14 +++-- pyhap/util.py | 2 +- tests/test_accessory_driver.py | 42 ++++++++++++- 6 files changed, 105 insertions(+), 46 deletions(-) diff --git a/main.py b/main.py index 60865b0a..ec45371d 100755 --- a/main.py +++ b/main.py @@ -56,7 +56,7 @@ def get_bridge(): def get_accessory(): """Call this method to get a standalone Accessory.""" - acc = TemperatureSensor("MyTempSensor") + acc = SyncTemperatureSensor("MyTempSensor") return acc diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index 0dfe44e3..b75d2d9b 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -4,10 +4,10 @@ import random import time -from pyhap.accessory import Accessory, Category +from pyhap.accessory import AsyncAccessory, Category import pyhap.loader as loader -class TemperatureSensor(Accessory): +class TemperatureSensor(AsyncAccessory): category = Category.SENSOR @@ -22,7 +22,7 @@ def _set_services(self): self.add_service( loader.get_serv_loader().get("TemperatureSensor")) - @Accessory.repeat(3) + @AsyncAccessory.run_at_interval(3) async def run(self): self.temp_char.set_value(random.randint(18, 26)) print(self.display_name, self.temp_char.value) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index dcbc1783..58a21c09 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -381,42 +381,13 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) - def repeat(seconds): - """Decorator that runs decorated method in a while loop, which repeats every - ``seconds``. It ues the ``Accessory.aio_stop_event``. - - :param seconds: The amount of seconds to wait for the event to be set. - Determines the interval on which the decorated method will be called. - :type seconds: float - """ - def _repeat(func): - async def _wrapper(self, *args, **kwargs): - while not await util.event_wait(self.aio_stop_event, seconds): - await func(self, *args, **kwargs) - return _wrapper - return _repeat - - async def run(self): + def run(self): """Called when the Accessory should start doing its thing. Called when HAP server is running, advertising is set, etc. """ pass - async def _wrap_in_thread(self): - threading.Thread(target=self.run).start() - - async def start(self): - """Create and await on a task for ``Accessory.run`` - - If ``Accessory.run`` is not a coroutine, it will be wrapped in a task that - starts it in a new thread. - """ - if asyncio.iscoroutinefunction(self.run): - await self.event_loop.create_task(self.run()) - else: - await self.event_loop.create_task(self._wrap_in_thread()) - def stop(self): """Called when the Accessory should stop what is doing and clean up any resources. """ @@ -448,7 +419,40 @@ def publish(self, data, sender): self.broker.publish(acc_data) -class Bridge(Accessory): +class AsyncAccessory(Accessory): + + def run_at_interval(seconds): + """Decorator that runs decorated method in a while loop, which repeats every + ``seconds`` until the ``Accessory.aio_stop_event`` is set. + + Can be used as: + .. code-block:: python + + @AsyncAccessory.run_at_interval(3) + async def run(self): + print("Hello again world!") + + :param seconds: The amount of seconds to wait for the event to be set. + Determines the interval on which the decorated method will be called. + :type seconds: float + """ + # decorator returns a decorator with the argument it got + def _repeat(func): + async def _wrapper(self, *args, **kwargs): + while not await util.event_wait(self.aio_stop_event, + seconds, + self.event_loop): + await func(self, *args, **kwargs) + return _wrapper + return _repeat + + async def run(self): + """Override in the implementation if needed. + """ + pass + + +class Bridge(AsyncAccessory): """A representation of a HAP bridge. A `Bridge` can have multiple `Accessories`. @@ -469,7 +473,7 @@ def __init__(self, display_name, mac=None, pincode=None, def set_sentinel(self, run_sentinel, aio_stop_event, event_loop): """Set the same sentinel to all contained accessories.""" - super(Bridge, self).set_sentinel(run_sentinel, aio_stop_event, event_loop) + super().set_sentinel(run_sentinel, aio_stop_event, event_loop) for acc in self.accessories.values(): acc.set_sentinel(run_sentinel, aio_stop_event, event_loop) @@ -532,10 +536,23 @@ def get_characteristic(self, aid, iid): return acc.get_characteristic(aid, iid) - async def start(self): + async def _wrap_in_thread(self, method): + """Coroutine which starts the given method in a thread. + """ + # Not going through event_loop.run_in_executor, because this thread may never + # terminate. + threading.Thread(target=method).start() + + async def run(self): """Schedule tasks for each of the accessories' run method. """ - tasks = (acc.start() for acc in self.accessories.values()) + tasks = [] + for acc in self.accessories.values(): + if isinstance(acc, AsyncAccessory): + task = self.event_loop.create_task(acc.run()) + else: + task = self.event_loop.create_task(self._wrap_in_thread(acc.run)) + tasks.append(task) await asyncio.gather(*tasks, loop=self.event_loop) def stop(self): diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index e730b943..0a973c2d 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -22,8 +22,6 @@ AccessoryDriver. """ import asyncio -from concurrent.futures import CancelledError, ThreadPoolExecutor -import functools import os import logging import socket @@ -37,7 +35,7 @@ from zeroconf import ServiceInfo, Zeroconf -from pyhap.accessory import get_topic, STANDALONE_AID +from pyhap.accessory import AsyncAccessory, get_topic, STANDALONE_AID from pyhap.characteristic import CharacteristicError from pyhap.params import get_srp_context from pyhap.hsrp import Server as SrpServer @@ -493,10 +491,13 @@ def start(self): # Start the accessory so it can do stuff. self.accessory.set_sentinel(self.stop_event, self.aio_stop_event, self.event_loop) - self.accessory_task = self.event_loop.create_task(self.accessory.start()) + if isinstance(self.accessory, AsyncAccessory): + self.accessory_task = self.event_loop.create_task(self.accessory.run()) + else: + self.accessory_task = self.event_loop.run_in_executor(None, + self.accessory.run) logger.info("Starting event loop") self.event_loop.run_until_complete(self.accessory_task) - self.event_loop.stop() self.event_loop.close() logger.info("Stopped event loop.") @@ -514,7 +515,8 @@ def stop(self): self.accessory.display_name, self.address, self.port) logger.debug("Setting stop events, stopping accessory and event sending") self.stop_event.set() - self.event_loop.call_soon_threadsafe(self.aio_stop_event.set) + if not self.event_loop.is_closed(): + self.event_loop.call_soon_threadsafe(self.aio_stop_event.set) self.accessory.stop() logger.debug("Stopping mDNS advertising") diff --git a/pyhap/util.py b/pyhap/util.py index 587a54dc..8d9d3bad 100644 --- a/pyhap/util.py +++ b/pyhap/util.py @@ -128,7 +128,7 @@ async def event_wait(event, timeout, loop=None): :rtype: bool """ try: - await asyncio.wait_for(event.wait(), timeout) + await asyncio.wait_for(event.wait(), timeout, loop=loop) except asyncio.TimeoutError: pass return event.is_set() \ No newline at end of file diff --git a/tests/test_accessory_driver.py b/tests/test_accessory_driver.py index 1f8d9a42..6b611c74 100644 --- a/tests/test_accessory_driver.py +++ b/tests/test_accessory_driver.py @@ -1,6 +1,7 @@ """ Tests for pyhap.accessory_driver """ +import asyncio import os import tempfile from unittest.mock import patch, Mock @@ -8,7 +9,7 @@ import pytest from pyhap.accessory import (Accessory, - Bridge, + AsyncAccessory, STANDALONE_AID) from pyhap.accessory_driver import AccessoryDriver @@ -47,3 +48,42 @@ def get_acc(): assert driver.accessory.public_key == pk finally: os.remove(persist_file) + + +@patch("pyhap.accessory_driver.asyncio.get_event_loop", + new=Mock(side_effect=asyncio.new_event_loop)) +@patch("pyhap.accessory_driver.Zeroconf", new=Mock()) +@patch("pyhap.accessory_driver.AccessoryDriver.persist") +@patch("pyhap.accessory_driver.HAPServer", new=Mock()) +def test_start_stop_sync_acc(_persist): + class Acc(Accessory): + running = True + def run(self): + while self.run_sentinel.wait(2): + pass + self.running = False + def setup_message(self): pass + + acc = Acc("TestAcc") + driver = AccessoryDriver(acc, 51234, persist_file="foo") + driver.start() + driver.stop() + assert not acc.running + + +#TODO: This test is failing when there is no patch for the get_event_loop. +# Something is getting the default loop and not the new loop +@patch("pyhap.accessory_driver.Zeroconf", new=Mock()) +@patch("pyhap.accessory_driver.AccessoryDriver.persist") +@patch("pyhap.accessory_driver.HAPServer", new=Mock()) +def test_start_stop_async_acc(_persist): + class Acc(AsyncAccessory): + @AsyncAccessory.run_at_interval(2) + async def run(self): + driver.stop() + def setup_message(self): pass + + acc = Acc("TestAcc") + driver = AccessoryDriver(acc, 51234, persist_file="foo") + driver.start() + assert driver.event_loop.is_closed() \ No newline at end of file From f7470052c868a75165b6dba0eb76d9ceda996229 Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Fri, 6 Apr 2018 12:17:44 +0300 Subject: [PATCH 5/6] run_at_interval for sync accessories; readme --- README.md | 31 ++++++++++++++++---------- main.py | 31 +++++--------------------- pyhap/accessories/TemperatureSensor.py | 1 - pyhap/accessory.py | 22 ++++++++++++++++++ pyhap/accessory_driver.py | 2 +- 5 files changed, 47 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 74ed0f45..f84b339f 100644 --- a/README.md +++ b/README.md @@ -54,10 +54,11 @@ will take care of advertising it on the local network, setting a HAP server and running the Accessory. Take a look at [main.py](main.py) for a quick start on that. ```python -from pyhap.accessory import Accessory, Category +from pyhap.accessory import Accessory, AsyncAccessory, Category import pyhap.loader as loader -class TemperatureSensor(Accessory): +### Async accessory - run method is run asynchronously in the event loop +class TemperatureSensor(AsyncAccessory): """Implementation of a mock temperature sensor accessory.""" category = Category.SENSOR # This is for the icon in the iOS Home app. @@ -98,25 +99,31 @@ class TemperatureSensor(Accessory): temp_sensor_service = loader.get_serv_loader().get("TemperatureSensor") self.add_service(temp_sensor_service) - def run(self): + @AsyncAcessory.run_at_interval(3) # Run this method every 3 seconds + async def run(self): """We override this method to implement what the accessory will do when it is started. An accessory is started and stopped from the AccessoryDriver. - It might be convenient to use the Accessory's run_sentinel, which is a - threading.Event object which is set when the accessory should stop running. - - In this example, we wait 3 seconds to see if the run_sentinel will be set and if - not, we set the current temperature to a random number. + We set the current temperature to a random number. The decorator runs this method + every 3 seconds. """ - while not self.run_sentinel.wait(3): - self.temp_char.set_value(random.randint(18, 26)) + self.temp_char.set_value(random.randint(18, 26)) def stop(self): """We override this method to clean up any resources or perform final actions, as - this is called by the AccessoryDriver when the Accessory is being stopped (it is - called right after run_sentinel is set). + this is called by the AccessoryDriver when the Accessory is being stopped. """ print("Stopping accessory.") + +### Synchronouse accessory - run method is in a thread +class SyncTemperatureSensor(Accessory): + """Everything is same as in the TemperatureSensor, apart from the run method which is + not async. + """ + + @Accessory.run_at_interval(3) + def run(self): + self.temp_char.set_value(random.randint(18, 26)) ``` ## Integrating non-compatible devices diff --git a/main.py b/main.py index ec45371d..4448c7de 100755 --- a/main.py +++ b/main.py @@ -11,39 +11,18 @@ import random from pyhap.accessories.TemperatureSensor import TemperatureSensor -from pyhap.accessory import Bridge, Accessory, Category +from pyhap.accessory import Bridge, AsyncAccessory, Accessory, Category from pyhap.accessory_driver import AccessoryDriver import pyhap.loader as loader -logging.basicConfig(level=logging.DEBUG) - - -class SyncTemperatureSensor(Accessory): - - category = Category.SENSOR - - def __init__(self, *args, **kwargs): - super(SyncTemperatureSensor, self).__init__(*args, **kwargs) - - self.temp_char = self.get_service("TemperatureSensor")\ - .get_characteristic("CurrentTemperature") - - def _set_services(self): - super(SyncTemperatureSensor, self)._set_services() - self.add_service( - loader.get_serv_loader().get("TemperatureSensor")) - - def run(self): - while not self.run_sentinel.wait(3): - self.temp_char.set_value(random.randint(18, 26)) - print(self.display_name, self.temp_char.value) +logging.basicConfig(level=logging.INFO) def get_bridge(): """Call this method to get a Bridge instead of a standalone accessory.""" bridge = Bridge(display_name="Bridge") - temp_sensor = TemperatureSensor("Aio") - temp_sensor2 = SyncTemperatureSensor("Synchronised") + temp_sensor = TemperatureSensor("Sensor 2") + temp_sensor2 = TemperatureSensor("Sensor 1") bridge.add_accessory(temp_sensor) bridge.add_accessory(temp_sensor2) @@ -56,7 +35,7 @@ def get_bridge(): def get_accessory(): """Call this method to get a standalone Accessory.""" - acc = SyncTemperatureSensor("MyTempSensor") + acc = TemperatureSensor("MyTempSensor") return acc diff --git a/pyhap/accessories/TemperatureSensor.py b/pyhap/accessories/TemperatureSensor.py index b75d2d9b..27c881b1 100644 --- a/pyhap/accessories/TemperatureSensor.py +++ b/pyhap/accessories/TemperatureSensor.py @@ -25,4 +25,3 @@ def _set_services(self): @AsyncAccessory.run_at_interval(3) async def run(self): self.temp_char.set_value(random.randint(18, 26)) - print(self.display_name, self.temp_char.value) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index 58a21c09..b2242e2c 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -381,6 +381,28 @@ def setup_message(self): self.print_qr() print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode()) + def run_at_interval(seconds): + """Decorator that runs decorated method in a while loop, which repeats every + ``seconds`` until the ``Accessory.run_sentinel`` is set. + + .. code-block:: python + + @Accessory.run_at_interval(3) + def run(self): + print("Hello again world!") + + :param seconds: The amount of seconds to wait for the event to be set. + Determines the interval on which the decorated method will be called. + :type seconds: float + """ + # decorator returns a decorator with the argument it got + def _repeat(func): + def _wrapper(self, *args, **kwargs): + while not self.run_sentinel.wait(seconds): + func(self, *args, **kwargs) + return _wrapper + return _repeat + def run(self): """Called when the Accessory should start doing its thing. diff --git a/pyhap/accessory_driver.py b/pyhap/accessory_driver.py index 0a973c2d..3d3582a5 100755 --- a/pyhap/accessory_driver.py +++ b/pyhap/accessory_driver.py @@ -109,7 +109,7 @@ class AccessoryDriver(object): NUM_EVENTS_BEFORE_STATS = 100 def __init__(self, accessory, port, address=None, persist_file="accessory.state", - encoder=None, max_acc_threads=3): + encoder=None): """ :param accessory: The `Accessory` to be managed by this driver. The `Accessory` must have the standalone AID (`pyhap.accessory.STANDALONE_AID`). If the From 99706c1efc397c3049a8fadad84342f3da9617d0 Mon Sep 17 00:00:00 2001 From: Ivan Kalchev Date: Fri, 6 Apr 2018 12:24:04 +0300 Subject: [PATCH 6/6] Doc changes, removed unused imports --- main.py | 4 ++-- pyhap/accessory.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/main.py b/main.py index 4448c7de..3b22eb02 100755 --- a/main.py +++ b/main.py @@ -11,7 +11,7 @@ import random from pyhap.accessories.TemperatureSensor import TemperatureSensor -from pyhap.accessory import Bridge, AsyncAccessory, Accessory, Category +from pyhap.accessory import Bridge from pyhap.accessory_driver import AccessoryDriver import pyhap.loader as loader @@ -39,7 +39,7 @@ def get_accessory(): return acc -acc = get_bridge() # Change to get_bridge() if you want to run a Bridge. +acc = get_accessory() # Change to get_bridge() if you want to run a Bridge. # Start the accessory on port 51826 driver = AccessoryDriver(acc, port=51826) diff --git a/pyhap/accessory.py b/pyhap/accessory.py index b2242e2c..e3dd8aa3 100755 --- a/pyhap/accessory.py +++ b/pyhap/accessory.py @@ -447,12 +447,11 @@ def run_at_interval(seconds): """Decorator that runs decorated method in a while loop, which repeats every ``seconds`` until the ``Accessory.aio_stop_event`` is set. - Can be used as: .. code-block:: python - @AsyncAccessory.run_at_interval(3) - async def run(self): - print("Hello again world!") + @AsyncAccessory.run_at_interval(3) + async def run(self): + print("Hello again world!") :param seconds: The amount of seconds to wait for the event to be set. Determines the interval on which the decorated method will be called.