Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ will take care of advertising it on the local network, setting a HAP server and
running the Accessory. Take a look at [main.py](main.py) for a quick start on that.

```python
from pyhap.accessory import Accessory, Category
from pyhap.accessory import Accessory, AsyncAccessory, Category
import pyhap.loader as loader

class TemperatureSensor(Accessory):
### Async accessory - run method is run asynchronously in the event loop
class TemperatureSensor(AsyncAccessory):
"""Implementation of a mock temperature sensor accessory."""

category = Category.SENSOR # This is for the icon in the iOS Home app.
Expand Down Expand Up @@ -98,25 +99,31 @@ class TemperatureSensor(Accessory):
temp_sensor_service = loader.get_serv_loader().get("TemperatureSensor")
self.add_service(temp_sensor_service)

def run(self):
@AsyncAcessory.run_at_interval(3) # Run this method every 3 seconds
async def run(self):
"""We override this method to implement what the accessory will do when it is
started. An accessory is started and stopped from the AccessoryDriver.

It might be convenient to use the Accessory's run_sentinel, which is a
threading.Event object which is set when the accessory should stop running.

In this example, we wait 3 seconds to see if the run_sentinel will be set and if
not, we set the current temperature to a random number.
We set the current temperature to a random number. The decorator runs this method
every 3 seconds.
"""
while not self.run_sentinel.wait(3):
self.temp_char.set_value(random.randint(18, 26))
self.temp_char.set_value(random.randint(18, 26))

def stop(self):
"""We override this method to clean up any resources or perform final actions, as
this is called by the AccessoryDriver when the Accessory is being stopped (it is
called right after run_sentinel is set).
this is called by the AccessoryDriver when the Accessory is being stopped.
"""
print("Stopping accessory.")

### Synchronouse accessory - run method is in a thread
class SyncTemperatureSensor(Accessory):
"""Everything is same as in the TemperatureSensor, apart from the run method which is
not async.
"""

@Accessory.run_at_interval(3)
def run(self):
self.temp_char.set_value(random.randint(18, 26))
```

## Integrating non-compatible devices <a name="HttpAcc"></a>
Expand Down
7 changes: 5 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@
"""
import logging
import signal
import time
import random

from pyhap.accessories.TemperatureSensor import TemperatureSensor
from pyhap.accessory import Bridge
from pyhap.accessory_driver import AccessoryDriver
import pyhap.loader as loader

logging.basicConfig(level=logging.INFO)


def get_bridge():
"""Call this method to get a Bridge instead of a standalone accessory."""
bridge = Bridge(display_name="Bridge")
temp_sensor = TemperatureSensor("Termometer")
temp_sensor2 = TemperatureSensor("Termometer2")
temp_sensor = TemperatureSensor("Sensor 2")
temp_sensor2 = TemperatureSensor("Sensor 1")
bridge.add_accessory(temp_sensor)
bridge.add_accessory(temp_sensor2)

Expand Down
12 changes: 6 additions & 6 deletions pyhap/accessories/TemperatureSensor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# An Accessory mocking a temperature sensor.
# It changes its value every few seconds.
import asyncio
import random
import time

from pyhap.accessory import Accessory, Category
from pyhap.accessory import AsyncAccessory, Category
import pyhap.loader as loader


class TemperatureSensor(Accessory):
class TemperatureSensor(AsyncAccessory):

category = Category.SENSOR

Expand All @@ -22,6 +22,6 @@ def _set_services(self):
self.add_service(
loader.get_serv_loader().get("TemperatureSensor"))

def run(self):
while not self.run_sentinel.wait(3):
self.temp_char.set_value(random.randint(18, 26))
@AsyncAccessory.run_at_interval(3)
async def run(self):
self.temp_char.set_value(random.randint(18, 26))
91 changes: 81 additions & 10 deletions pyhap/accessory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import threading
import logging
import itertools
Expand All @@ -8,7 +9,7 @@
import base36
from pyqrcode import QRCode

import pyhap.util as util
from pyhap import util
from pyhap.loader import get_serv_loader

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -155,6 +156,8 @@ def __init__(self, display_name, aid=None, mac=None, pincode=None,
self.broker = None
# threading.Event that gets set when the Accessory should stop.
self.run_sentinel = None
self.event_loop = None
self.aio_stop_event = None

sk, vk = ed25519.create_keypair()
self.private_key = sk
Expand Down Expand Up @@ -211,7 +214,7 @@ def _set_services(self):
# FIXME: Need to ensure AccessoryInformation is with IID 1.
self.add_service(info_service)

def set_sentinel(self, run_sentinel):
def set_sentinel(self, run_sentinel, aio_stop_event, event_loop):
"""Assign a run sentinel that can signal stopping.

The run sentinel is a threading.Event object that can be used to manage
Expand All @@ -224,6 +227,8 @@ def set_sentinel(self, run_sentinel):
... sensor.readTemperature()
"""
self.run_sentinel = run_sentinel
self.aio_stop_event = aio_stop_event
self.event_loop = event_loop

def config_changed(self):
"""Notify the accessory about configuration changes.
Expand Down Expand Up @@ -376,6 +381,28 @@ def setup_message(self):
self.print_qr()
print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode())

def run_at_interval(seconds):
"""Decorator that runs decorated method in a while loop, which repeats every
``seconds`` until the ``Accessory.run_sentinel`` is set.

.. code-block:: python

@Accessory.run_at_interval(3)
def run(self):
print("Hello again world!")

:param seconds: The amount of seconds to wait for the event to be set.
Determines the interval on which the decorated method will be called.
:type seconds: float
"""
# decorator returns a decorator with the argument it got
def _repeat(func):
def _wrapper(self, *args, **kwargs):
while not self.run_sentinel.wait(seconds):
func(self, *args, **kwargs)
return _wrapper
return _repeat

def run(self):
"""Called when the Accessory should start doing its thing.

Expand Down Expand Up @@ -414,7 +441,39 @@ def publish(self, data, sender):
self.broker.publish(acc_data)


class Bridge(Accessory):
class AsyncAccessory(Accessory):

def run_at_interval(seconds):
"""Decorator that runs decorated method in a while loop, which repeats every
``seconds`` until the ``Accessory.aio_stop_event`` is set.

.. code-block:: python

@AsyncAccessory.run_at_interval(3)
async def run(self):
print("Hello again world!")

:param seconds: The amount of seconds to wait for the event to be set.
Determines the interval on which the decorated method will be called.
:type seconds: float
"""
# decorator returns a decorator with the argument it got
def _repeat(func):
async def _wrapper(self, *args, **kwargs):
while not await util.event_wait(self.aio_stop_event,
seconds,
self.event_loop):
await func(self, *args, **kwargs)
return _wrapper
return _repeat

async def run(self):
"""Override in the implementation if needed.
"""
pass


class Bridge(AsyncAccessory):
"""A representation of a HAP bridge.

A `Bridge` can have multiple `Accessories`.
Expand All @@ -433,11 +492,11 @@ def __init__(self, display_name, mac=None, pincode=None,
setup_id=setup_id)
self.accessories = {} # aid: acc

def set_sentinel(self, run_sentinel):
def set_sentinel(self, run_sentinel, aio_stop_event, event_loop):
"""Set the same sentinel to all contained accessories."""
super(Bridge, self).set_sentinel(run_sentinel)
super().set_sentinel(run_sentinel, aio_stop_event, event_loop)
for acc in self.accessories.values():
acc.set_sentinel(run_sentinel)
acc.set_sentinel(run_sentinel, aio_stop_event, event_loop)

def add_accessory(self, acc):
"""Add the given ``Accessory`` to this ``Bridge``.
Expand Down Expand Up @@ -498,12 +557,24 @@ def get_characteristic(self, aid, iid):

return acc.get_characteristic(aid, iid)

def run(self):
"""Creates and starts a new thread for each of the contained accessories' run
method.
async def _wrap_in_thread(self, method):
"""Coroutine which starts the given method in a thread.
"""
# Not going through event_loop.run_in_executor, because this thread may never
# terminate.
threading.Thread(target=method).start()

async def run(self):
"""Schedule tasks for each of the accessories' run method.
"""
tasks = []
for acc in self.accessories.values():
threading.Thread(target=acc.run).start()
if isinstance(acc, AsyncAccessory):
task = self.event_loop.create_task(acc.run())
else:
task = self.event_loop.create_task(self._wrap_in_thread(acc.run))
tasks.append(task)
await asyncio.gather(*tasks, loop=self.event_loop)

def stop(self):
"""Calls stop() on all contained accessories."""
Expand Down
34 changes: 22 additions & 12 deletions pyhap/accessory_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
or went to sleep before telling us. This concludes the publishing process from the
AccessoryDriver.
"""
import asyncio
import os
import logging
import socket
Expand All @@ -34,7 +35,7 @@

from zeroconf import ServiceInfo, Zeroconf

from pyhap.accessory import get_topic, STANDALONE_AID
from pyhap.accessory import AsyncAccessory, get_topic, STANDALONE_AID
from pyhap.characteristic import CharacteristicError
from pyhap.params import get_srp_context
from pyhap.hsrp import Server as SrpServer
Expand Down Expand Up @@ -156,6 +157,9 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state"
self.persist()
self.topics = {} # topic: set of (address, port) of subscribed clients
self.topic_lock = threading.Lock() # for exclusive access to the topics
self.event_loop = asyncio.get_event_loop()
self.aio_stop_event = asyncio.Event()
Copy link

@thomaspurchas thomaspurchas Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a better, cleaner way to do this.

It should be possible to call stop_event.wait() using event_loop.run_in_executor() to wait on the threaded event in a separate thread managed by asyncio. Then put a callback on the returned future to call aio_stop_event.set() back in the main thread.

That way calling stop_event.set() anywhere is both threadsafe, and guaranteed to stop both threads and the event_loop.

I'll put together a PR once I figured out the nicest way the structure the code.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it will be better. Do you mind if we leave it for another change?

self.stop_event = threading.Event()
self.event_queue = queue.Queue() # (topic, bytes)
self.send_event_thread = None # the event dispatch thread
self.sent_events = 0
Expand All @@ -164,7 +168,6 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state"
self.accessory.set_broker(self)
self.mdns_service_info = None
self.srp_verifier = None
self.run_sentinel = None
self.accessory_thread = None

def subscribe_client_topic(self, client, topic, subscribe=True):
Expand Down Expand Up @@ -228,7 +231,7 @@ def send_events(self):
this is not run in a daemon thread or it is run on the main thread, the app will
hang.
"""
while not self.run_sentinel.is_set():
while not self.event_loop.is_closed():
# Maybe consider having a pool of worker threads, each performing a send in
# order to increase throughput.
topic, bytedata = self.event_queue.get()
Expand Down Expand Up @@ -462,12 +465,6 @@ def start(self):
logger.info("Starting accessory '%s' on address '%s', port '%s'.",
self.accessory.display_name, self.address, self.port)

# Start the accessory so it can do stuff.
self.run_sentinel = threading.Event()
self.accessory.set_sentinel(self.run_sentinel)
self.accessory_thread = threading.Thread(target=self.accessory.run)
self.accessory_thread.start()

# Start sending events to clients. This is done in a daemon thread, because:
# - if the queue is blocked waiting on an empty queue, then there is nothing left
# for clean up.
Expand All @@ -492,6 +489,18 @@ def start(self):
if not self.accessory.paired:
self.accessory.setup_message()

# Start the accessory so it can do stuff.
self.accessory.set_sentinel(self.stop_event, self.aio_stop_event, self.event_loop)
if isinstance(self.accessory, AsyncAccessory):
self.accessory_task = self.event_loop.create_task(self.accessory.run())
else:
self.accessory_task = self.event_loop.run_in_executor(None,
self.accessory.run)
logger.info("Starting event loop")
self.event_loop.run_until_complete(self.accessory_task)
self.event_loop.close()
logger.info("Stopped event loop.")

def stop(self):
"""Stop the accessory.

Expand All @@ -504,10 +513,11 @@ def stop(self):
# to ensure that sending with a closed server will not crash the app.
logger.info("Stoping accessory '%s' on address %s, port %s.",
self.accessory.display_name, self.address, self.port)
logger.debug("Setting run sentinel, stopping accessory and event sending")
self.run_sentinel.set()
logger.debug("Setting stop events, stopping accessory and event sending")
self.stop_event.set()
if not self.event_loop.is_closed():
self.event_loop.call_soon_threadsafe(self.aio_stop_event.set)
self.accessory.stop()
self.accessory_thread.join()

logger.debug("Stopping mDNS advertising")
self.advertiser.unregister_service(self.mdns_service_info)
Expand Down
19 changes: 19 additions & 0 deletions pyhap/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import socket
import random
import binascii
Expand Down Expand Up @@ -113,3 +114,21 @@ def hex2b(hex):
fromhex = bytes.fromhex if sys.version_info >= (3, 5) else hex2b
"""Python-version-agnostic fromhex function. Equivalent to bytes.fromhex in python 3.5+.
"""

async def event_wait(event, timeout, loop=None):
"""Wait for the given event to be set or for the timeout to expire.

:param event: The event to wait for.
:type event: asyncio.Event

:param timeout: The timeout for which to wait, in seconds.
:type timeout: float

:return: ``event.is_set()``
:rtype: bool
"""
try:
await asyncio.wait_for(event.wait(), timeout, loop=loop)
except asyncio.TimeoutError:
pass
return event.is_set()
Loading