Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/examples/sensor_stream.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

PMSx003 4 samples on default format
2021-07-29 15:57:01: PM1 1.0, PM2.5 11.0, PM10 12.0 μg/m3
2021-07-29 15:57:21: PM1 0.0, PM2.5 6.0, PM10 6.0 μg/m3
2021-07-29 15:57:41: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3
2021-07-29 15:58:01: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3
17 changes: 17 additions & 0 deletions docs/examples/sensor_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Read PMSx003 sensor on /dev/ttyUSB0.

Use a low-level Stream class for more granular
control of the sensor.
"""

from pms.core import SensorStream

stream = SensorStream(sensor="PMSx003", port="/dev/ttyUSB0")

print("\nPMSx003 4 samples on default format")

stream.open()
for _ in range(4):
print(stream.read())
stream.close()
10 changes: 10 additions & 0 deletions docs/library_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ This section contain some help for those brave enough to use its internals as a
--8<-- "read_two_sensors.out"
```

=== "sensor stream (low-level API)"

``` python
--8<-- "sensor_stream.py"
```

```
--8<-- "sensor_stream.out"
```


## Observation data fields

Expand Down
10 changes: 8 additions & 2 deletions src/pms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ class WrongMessageChecksum(SensorWarning):
pass


class SensorWarmingUp(SensorWarning):
class SensorNotReady(SensorWarning):
"""Sensor not ready to read: observations not reliable"""

pass


class SensorWarmingUp(SensorNotReady):
"""Empty message: throw away observation and wait until sensor warms up"""

pass


class InconsistentObservation(SensorWarning):
class InconsistentObservation(SensorNotReady):
"""Message payload makes no sense: throw away observation"""

pass
9 changes: 8 additions & 1 deletion src/pms/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
from .sensor import Sensor, Supported # isort: skip
from .reader import MessageReader, SensorReader, UnableToRead, exit_on_fail
from .reader import (
MessageReader,
MessageStream,
SensorReader,
SensorStream,
UnableToRead,
exit_on_fail,
)
221 changes: 158 additions & 63 deletions src/pms/core/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
import sys
import time
from abc import abstractmethod
from contextlib import AbstractContextManager, contextmanager
from contextlib import contextmanager
from csv import DictReader
from dataclasses import dataclass
from pathlib import Path
from textwrap import wrap
from typing import Iterator, NamedTuple, Optional, Union, overload

from serial import Serial
from typer import progressbar

from pms import InconsistentObservation, SensorWarmingUp, SensorWarning, logger
from pms import SensorNotReady, SensorWarning, logger
from pms.core import Sensor, Supported

from .types import ObsData
Expand All @@ -33,6 +34,14 @@ class UnableToRead(Exception):
pass


class StreamNotReady(Exception):
pass


class TemporaryFailure(Exception):
pass
Comment on lines +37 to +42
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe these and the "old" exceptions should be collected on a separate module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do. I'll come back to this when we decide what should happen with the PR overall.



class RawData(NamedTuple):
"""raw messages with timestamp"""

Expand All @@ -50,33 +59,101 @@ def hexdump(self, line: Optional[int] = None) -> str:
return f"{offset:08x}: {hex} {dump}"


class Reader(AbstractContextManager):
@dataclass
class Reading:
buffer: bytes
obs_data: ObsData

@property
def raw_data(self) -> RawData:
return RawData(self.time, self.buffer)

@property
def time(self) -> int:
return self.obs_data.time


class Stream:
"""
Standard interface to read data.
"""

@abstractmethod
def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]:
def read(self) -> Reading:
...

@abstractmethod
def open(self) -> None:
...

@abstractmethod
def close(self) -> None:
...


class Reader:
def __init__(
self,
*,
stream: Stream,
interval: Optional[int] = None,
samples: Optional[int] = None,
) -> None:
self.interval = interval
self.samples = samples
self.stream = stream

def __call__(self, *, raw: Optional[bool] = None) -> Iterator[Union[RawData, ObsData]]:
"""
Return an iterator of ObsData.

If "raw" is set to True, then ObsData is replaced with RawData.
"""
...

sample = 0
try:
while True:
try:
reading = self.stream.read()
except StreamNotReady as e:
logger.debug(e)
time.sleep(5)
except TemporaryFailure as e:
logger.debug(e)
else:
yield reading.raw_data if raw else reading.obs_data
sample += 1
if self.samples is not None and sample >= self.samples:
break
if self.interval:
delay = self.interval - (time.time() - reading.time)
if delay > 0:
time.sleep(delay)
except KeyboardInterrupt: # pragma: no cover
print()
except StopIteration:
pass

def __enter__(self):
self.stream.open()
return self

class SensorReader(Reader):
"""Read sensor messages from serial port
def __exit__(self, *args) -> None:
self.stream.close()

The sensor is woken up after opening the serial port, and put to sleep when before closing the port.
While the serial port is open, the sensor is read in passive mode.

class SensorStream(Stream):
"""Read sensor messages from serial port

PMS3003 sensors do not accept serial commands, such as wake/sleep or passive mode read.
Valid messages are extracted from the serial buffer.
"""

def __init__(
self,
*,
sensor: Union[Sensor, Supported, str] = Supported.default,
port: str = "/dev/ttyUSB0",
interval: Optional[int] = None,
samples: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Configure serial port"""
Expand All @@ -86,12 +163,6 @@ def __init__(
self.serial.port = port
self.serial.baudrate = self.sensor.baud
self.serial.timeout = timeout or 5 # max time to wake up sensor
self.interval = interval
self.samples = samples
logger.debug(
f"capture {samples if samples else '?'} {sensor} obs "
f"from {port} every {interval if interval else '?'} secs"
)

def _cmd(self, command: str) -> bytes:
"""Write command to sensor and return answer"""
Expand Down Expand Up @@ -119,7 +190,25 @@ def _pre_heat(self):
# only pre-heat the firs time
self.pre_heat = 0

def __enter__(self) -> "SensorReader":
def read(self) -> Reading:
"""Return a single passive mode reading"""

if not self.serial.is_open:
raise StopIteration

buffer = self._cmd("passive_read")

try:
obs = self.sensor.decode(buffer)
return Reading(buffer=buffer, obs_data=obs)
except SensorNotReady as e:
# no special hardware handling
raise StreamNotReady
except SensorWarning as e: # pragma: no cover
self.serial.reset_input_buffer()
raise TemporaryFailure

def open(self) -> None:
"""Open serial port and sensor setup"""
if not self.serial.is_open:
logger.debug(f"open {self.serial.port}")
Expand All @@ -143,73 +232,79 @@ def __enter__(self) -> "SensorReader":
logger.error(f"Sensor is not {self.sensor.name}")
raise UnableToRead("Sensor failed validation")

return self

def __exit__(self, exception_type, exception_value, traceback) -> None:
def close(self) -> None:
"""Put sensor to sleep and close serial port"""
logger.debug(f"sleep {self.sensor}")
buffer = self._cmd("sleep")
logger.debug(f"close {self.serial.port}")
self.serial.close()

def __call__(self, *, raw: Optional[bool] = None):
"""Passive mode reading at regular intervals"""

sample = 0
while self.serial.is_open:
try:
buffer = self._cmd("passive_read")
class SensorReader(Reader):
"""Read sensor messages from serial port

try:
obs = self.sensor.decode(buffer)
except (SensorWarmingUp, InconsistentObservation) as e:
logger.debug(e)
time.sleep(5)
except SensorWarning as e: # pragma: no cover
logger.debug(e)
self.serial.reset_input_buffer()
else:
yield RawData(obs.time, buffer) if raw else obs
sample += 1
if self.samples is not None and sample >= self.samples:
break
if self.interval:
delay = self.interval - (time.time() - obs.time)
if delay > 0:
time.sleep(delay)
except KeyboardInterrupt: # pragma: no cover
print()
break
The sensor is woken up after opening the serial port, and put to sleep when before closing the port.
While the serial port is open, the sensor is read in passive mode.
"""

def __init__(
self,
sensor: Union[Sensor, Supported, str] = Supported.default,
port: str = "/dev/ttyUSB0",
interval: Optional[int] = None,
samples: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
super().__init__(
stream=SensorStream(
sensor=sensor,
port=port,
timeout=timeout,
),
samples=samples,
interval=interval,
)
logger.debug(
f"capture {samples if samples else '?'} {sensor} obs "
f"from {port} every {interval if interval else '?'} secs"
)

class MessageReader(Reader):
def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None:
@property
def sensor(self):
return self.stream.sensor


class MessageStream(Stream):
def __init__(self, *, path: Path, sensor: Sensor) -> None:
self.path = path
self.sensor = sensor
self.samples = samples

def __enter__(self) -> "MessageReader":
def read(self) -> Reading:
if not hasattr(self, "data"):
raise StopIteration

row = next(self.data)
time, message = int(row["time"]), bytes.fromhex(row["hex"])
obs = self.sensor.decode(message, time=time)
return Reading(buffer=message, obs_data=obs)

def open(self) -> None:
logger.debug(f"open {self.path}")
self.csv = self.path.open()
reader = DictReader(self.csv)
self.data = (row for row in reader if row["sensor"] == self.sensor.name)
return self

def __exit__(self, exception_type, exception_value, traceback) -> None:
def close(self) -> None:
logger.debug(f"close {self.path}")
self.csv.close()

def __call__(self, *, raw: Optional[bool] = None):
if not hasattr(self, "data"):
return

for row in self.data:
time, message = int(row["time"]), bytes.fromhex(row["hex"])
yield RawData(time, message) if raw else self.sensor.decode(message, time=time)
if self.samples:
self.samples -= 1
if self.samples <= 0:
break
class MessageReader(Reader):
def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None:
super().__init__(
stream=MessageStream(path=path, sensor=sensor),
samples=samples,
)


@contextmanager
Expand Down
Loading