diff --git a/docs/examples/sensor_stream.out b/docs/examples/sensor_stream.out new file mode 100644 index 0000000..032406c --- /dev/null +++ b/docs/examples/sensor_stream.out @@ -0,0 +1,6 @@ + +PMSx003 4 samples on default format +2021-07-29 15:57:01: PM1 1.0, PM2.5 11.0, PM10 12.0 μg/m3 +2021-07-29 15:57:21: PM1 0.0, PM2.5 6.0, PM10 6.0 μg/m3 +2021-07-29 15:57:41: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3 +2021-07-29 15:58:01: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3 diff --git a/docs/examples/sensor_stream.py b/docs/examples/sensor_stream.py new file mode 100644 index 0000000..2fc8219 --- /dev/null +++ b/docs/examples/sensor_stream.py @@ -0,0 +1,17 @@ +""" +Read PMSx003 sensor on /dev/ttyUSB0. + +Use a low-level Stream class for more granular +control of the sensor. +""" + +from pms.core import SensorStream + +stream = SensorStream(sensor="PMSx003", port="/dev/ttyUSB0") + +print("\nPMSx003 4 samples on default format") + +stream.open() +for _ in range(4): + print(stream.read()) +stream.close() diff --git a/docs/library_usage.md b/docs/library_usage.md index 7efc3a1..e9a7974 100644 --- a/docs/library_usage.md +++ b/docs/library_usage.md @@ -27,6 +27,16 @@ This section contain some help for those brave enough to use its internals as a --8<-- "read_two_sensors.out" ``` +=== "sensor stream (low-level API)" + + ``` python + --8<-- "sensor_stream.py" + ``` + + ``` + --8<-- "sensor_stream.out" + ``` + ## Observation data fields diff --git a/src/pms/__init__.py b/src/pms/__init__.py index 4549e10..cd8635f 100644 --- a/src/pms/__init__.py +++ b/src/pms/__init__.py @@ -21,13 +21,19 @@ class WrongMessageChecksum(SensorWarning): pass -class SensorWarmingUp(SensorWarning): +class SensorNotReady(SensorWarning): + """Sensor not ready to read: observations not reliable""" + + pass + + +class SensorWarmingUp(SensorNotReady): """Empty message: throw away observation and wait until sensor warms up""" pass -class InconsistentObservation(SensorWarning): +class InconsistentObservation(SensorNotReady): """Message payload makes no sense: throw away observation""" pass diff --git a/src/pms/core/__init__.py b/src/pms/core/__init__.py index 10a9dfa..b1499c7 100644 --- a/src/pms/core/__init__.py +++ b/src/pms/core/__init__.py @@ -1,2 +1,9 @@ from .sensor import Sensor, Supported # isort: skip -from .reader import MessageReader, SensorReader, UnableToRead, exit_on_fail +from .reader import ( + MessageReader, + MessageStream, + SensorReader, + SensorStream, + UnableToRead, + exit_on_fail, +) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 1cabf5d..d7239b2 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -9,8 +9,9 @@ import sys import time from abc import abstractmethod -from contextlib import AbstractContextManager, contextmanager +from contextlib import contextmanager from csv import DictReader +from dataclasses import dataclass from pathlib import Path from textwrap import wrap from typing import Iterator, NamedTuple, Optional, Union, overload @@ -18,7 +19,7 @@ from serial import Serial from typer import progressbar -from pms import InconsistentObservation, SensorWarmingUp, SensorWarning, logger +from pms import SensorNotReady, SensorWarning, logger from pms.core import Sensor, Supported from .types import ObsData @@ -33,6 +34,14 @@ class UnableToRead(Exception): pass +class StreamNotReady(Exception): + pass + + +class TemporaryFailure(Exception): + pass + + class RawData(NamedTuple): """raw messages with timestamp""" @@ -50,22 +59,91 @@ def hexdump(self, line: Optional[int] = None) -> str: return f"{offset:08x}: {hex} {dump}" -class Reader(AbstractContextManager): +@dataclass +class Reading: + buffer: bytes + obs_data: ObsData + + @property + def raw_data(self) -> RawData: + return RawData(self.time, self.buffer) + + @property + def time(self) -> int: + return self.obs_data.time + + +class Stream: + """ + Standard interface to read data. + """ + @abstractmethod - def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]: + def read(self) -> Reading: + ... + + @abstractmethod + def open(self) -> None: + ... + + @abstractmethod + def close(self) -> None: + ... + + +class Reader: + def __init__( + self, + *, + stream: Stream, + interval: Optional[int] = None, + samples: Optional[int] = None, + ) -> None: + self.interval = interval + self.samples = samples + self.stream = stream + + def __call__(self, *, raw: Optional[bool] = None) -> Iterator[Union[RawData, ObsData]]: """ Return an iterator of ObsData. If "raw" is set to True, then ObsData is replaced with RawData. """ - ... + sample = 0 + try: + while True: + try: + reading = self.stream.read() + except StreamNotReady as e: + logger.debug(e) + time.sleep(5) + except TemporaryFailure as e: + logger.debug(e) + else: + yield reading.raw_data if raw else reading.obs_data + sample += 1 + if self.samples is not None and sample >= self.samples: + break + if self.interval: + delay = self.interval - (time.time() - reading.time) + if delay > 0: + time.sleep(delay) + except KeyboardInterrupt: # pragma: no cover + print() + except StopIteration: + pass + + def __enter__(self): + self.stream.open() + return self -class SensorReader(Reader): - """Read sensor messages from serial port + def __exit__(self, *args) -> None: + self.stream.close() - The sensor is woken up after opening the serial port, and put to sleep when before closing the port. - While the serial port is open, the sensor is read in passive mode. + +class SensorStream(Stream): + """Read sensor messages from serial port PMS3003 sensors do not accept serial commands, such as wake/sleep or passive mode read. Valid messages are extracted from the serial buffer. @@ -73,10 +151,9 @@ class SensorReader(Reader): def __init__( self, + *, sensor: Union[Sensor, Supported, str] = Supported.default, port: str = "/dev/ttyUSB0", - interval: Optional[int] = None, - samples: Optional[int] = None, timeout: Optional[float] = None, ) -> None: """Configure serial port""" @@ -86,12 +163,6 @@ def __init__( self.serial.port = port self.serial.baudrate = self.sensor.baud self.serial.timeout = timeout or 5 # max time to wake up sensor - self.interval = interval - self.samples = samples - logger.debug( - f"capture {samples if samples else '?'} {sensor} obs " - f"from {port} every {interval if interval else '?'} secs" - ) def _cmd(self, command: str) -> bytes: """Write command to sensor and return answer""" @@ -119,7 +190,25 @@ def _pre_heat(self): # only pre-heat the firs time self.pre_heat = 0 - def __enter__(self) -> "SensorReader": + def read(self) -> Reading: + """Return a single passive mode reading""" + + if not self.serial.is_open: + raise StopIteration + + buffer = self._cmd("passive_read") + + try: + obs = self.sensor.decode(buffer) + return Reading(buffer=buffer, obs_data=obs) + except SensorNotReady as e: + # no special hardware handling + raise StreamNotReady + except SensorWarning as e: # pragma: no cover + self.serial.reset_input_buffer() + raise TemporaryFailure + + def open(self) -> None: """Open serial port and sensor setup""" if not self.serial.is_open: logger.debug(f"open {self.serial.port}") @@ -143,73 +232,79 @@ def __enter__(self) -> "SensorReader": logger.error(f"Sensor is not {self.sensor.name}") raise UnableToRead("Sensor failed validation") - return self - - def __exit__(self, exception_type, exception_value, traceback) -> None: + def close(self) -> None: """Put sensor to sleep and close serial port""" logger.debug(f"sleep {self.sensor}") buffer = self._cmd("sleep") logger.debug(f"close {self.serial.port}") self.serial.close() - def __call__(self, *, raw: Optional[bool] = None): - """Passive mode reading at regular intervals""" - sample = 0 - while self.serial.is_open: - try: - buffer = self._cmd("passive_read") +class SensorReader(Reader): + """Read sensor messages from serial port - try: - obs = self.sensor.decode(buffer) - except (SensorWarmingUp, InconsistentObservation) as e: - logger.debug(e) - time.sleep(5) - except SensorWarning as e: # pragma: no cover - logger.debug(e) - self.serial.reset_input_buffer() - else: - yield RawData(obs.time, buffer) if raw else obs - sample += 1 - if self.samples is not None and sample >= self.samples: - break - if self.interval: - delay = self.interval - (time.time() - obs.time) - if delay > 0: - time.sleep(delay) - except KeyboardInterrupt: # pragma: no cover - print() - break + The sensor is woken up after opening the serial port, and put to sleep when before closing the port. + While the serial port is open, the sensor is read in passive mode. + """ + def __init__( + self, + sensor: Union[Sensor, Supported, str] = Supported.default, + port: str = "/dev/ttyUSB0", + interval: Optional[int] = None, + samples: Optional[int] = None, + timeout: Optional[float] = None, + ) -> None: + super().__init__( + stream=SensorStream( + sensor=sensor, + port=port, + timeout=timeout, + ), + samples=samples, + interval=interval, + ) + logger.debug( + f"capture {samples if samples else '?'} {sensor} obs " + f"from {port} every {interval if interval else '?'} secs" + ) -class MessageReader(Reader): - def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None: + @property + def sensor(self): + return self.stream.sensor + + +class MessageStream(Stream): + def __init__(self, *, path: Path, sensor: Sensor) -> None: self.path = path self.sensor = sensor - self.samples = samples - def __enter__(self) -> "MessageReader": + def read(self) -> Reading: + if not hasattr(self, "data"): + raise StopIteration + + row = next(self.data) + time, message = int(row["time"]), bytes.fromhex(row["hex"]) + obs = self.sensor.decode(message, time=time) + return Reading(buffer=message, obs_data=obs) + + def open(self) -> None: logger.debug(f"open {self.path}") self.csv = self.path.open() reader = DictReader(self.csv) self.data = (row for row in reader if row["sensor"] == self.sensor.name) - return self - def __exit__(self, exception_type, exception_value, traceback) -> None: + def close(self) -> None: logger.debug(f"close {self.path}") self.csv.close() - def __call__(self, *, raw: Optional[bool] = None): - if not hasattr(self, "data"): - return - for row in self.data: - time, message = int(row["time"]), bytes.fromhex(row["hex"]) - yield RawData(time, message) if raw else self.sensor.decode(message, time=time) - if self.samples: - self.samples -= 1 - if self.samples <= 0: - break +class MessageReader(Reader): + def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None: + super().__init__( + stream=MessageStream(path=path, sensor=sensor), + samples=samples, + ) @contextmanager diff --git a/tests/conftest.py b/tests/conftest.py index d5328bf..484f4cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -108,7 +108,7 @@ def capture_data(request) -> CapturedData: @pytest.fixture() def capture(monkeypatch, capture_data) -> CapturedData: - """mock pms.core.reader.Serial and some pms.core.reader.SensorReader internals""" + """mock pms.core.reader internals""" class MockSerial: port = None @@ -130,7 +130,7 @@ def reset_input_buffer(self): sensor = capture_data.sensor data = capture_data.data - def mock_reader__cmd(self, command: str) -> bytes: + def mock_stream__cmd(self, command: str) -> bytes: """bypass serial.write/read""" logger.debug(f"mock write/read: {command}") # nonlocal data @@ -141,12 +141,12 @@ def mock_reader__cmd(self, command: str) -> bytes: return b"" - monkeypatch.setattr("pms.core.reader.SensorReader._cmd", mock_reader__cmd) + monkeypatch.setattr("pms.core.reader.SensorStream._cmd", mock_stream__cmd) def mock_reader__pre_heat(self): pass - monkeypatch.setattr("pms.core.reader.SensorReader._pre_heat", mock_reader__pre_heat) + monkeypatch.setattr("pms.core.reader.SensorStream._pre_heat", mock_reader__pre_heat) def mock_sensor_check(self, buffer: bytes, command: str) -> bool: """don't check if message matches sensor""" diff --git a/tests/core/test_reader.py b/tests/core/test_reader.py index 34d9dbf..2a849b5 100644 --- a/tests/core/test_reader.py +++ b/tests/core/test_reader.py @@ -107,6 +107,31 @@ def passive_read(n): ) +@pytest.fixture +def mock_sensor_temp_failure(mock_serial): + def passive_read(n): + if n == 1: + # first return garbage data (bad checksum) + return ( + b"BM\x00\x1c" # expected header + + b"\0" * 26 # payload (to total 32 bytes) + + b"\x00\xFF" # checksum + ) + else: + # then behave like the original stub again + return ( + b"BM\x00\x1c" # expected header + + b".........................." # payload + + b"\x05W" # checksum + ) + + mock_serial.stub( + name="passive_read", + receive_bytes=b"BM\xe2\x00\x00\x01q", + send_fn=passive_read, + ) + + @pytest.fixture def sensor_reader_factory(monkeypatch, mock_sensor): def factory( @@ -125,7 +150,7 @@ def factory( # https://github.com/pyserial/pyserial/issues/625 monkeypatch.setattr( - sensor_reader.serial, + sensor_reader.stream.serial, "flush", lambda: None, ) @@ -179,7 +204,7 @@ def test_sensor_reader_preheat(sensor_reader_factory, mock_sleep): sensor_reader = sensor_reader_factory() # override pre heat duration - sensor_reader.pre_heat = 5 + sensor_reader.stream.pre_heat = 5 with sensor_reader as r: pass @@ -204,6 +229,23 @@ def test_sensor_reader_warm_up( assert len(obs) == 1 +def test_sensor_reader_temp_failure( + mock_sensor, + sensor_reader_factory, + mock_sensor_temp_failure, +): + sensor_reader = sensor_reader_factory() + + with sensor_reader as r: + obs = list(r()) + + # check one sample still acquired + assert len(obs) == 1 + + # check two samples were attempted + assert mock_sensor.stubs["passive_read"].calls == 2 + + def test_sensor_reader_sensor_mismatch(mock_sensor, sensor_reader_factory): sensor_reader = sensor_reader_factory()