From 11f870c88f53e755190ef542725c09c42f510c9c Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sun, 6 Nov 2022 10:59:13 +0000 Subject: [PATCH 1/7] Introduce Reading type to DRY-up reader internals Reading DRYs-up knowledge of how to construct "RawData". Since Reader classes don't know how to construct "ObsData", it makes sense that the knowledge of how to construct "RawData" is also abstracted away. The new type will also be heavily used in the following commits, as we introduce a new interface to read from sensors. In future, the new type could replace "RawData" and "ObsData" as a standard interface. --- src/pms/core/reader.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 1cabf5d..28d3877 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -11,6 +11,7 @@ from abc import abstractmethod from contextlib import AbstractContextManager, contextmanager from csv import DictReader +from dataclasses import dataclass from pathlib import Path from textwrap import wrap from typing import Iterator, NamedTuple, Optional, Union, overload @@ -50,6 +51,20 @@ def hexdump(self, line: Optional[int] = None) -> str: return f"{offset:08x}: {hex} {dump}" +@dataclass +class Reading: + buffer: bytes + obs_data: ObsData + + @property + def raw_data(self) -> RawData: + return RawData(self.time, self.buffer) + + @property + def time(self) -> int: + return self.obs_data.time + + class Reader(AbstractContextManager): @abstractmethod def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]: @@ -162,6 +177,7 @@ def __call__(self, *, raw: Optional[bool] = None): try: obs = self.sensor.decode(buffer) + reading = Reading(buffer=buffer, obs_data=obs) except (SensorWarmingUp, InconsistentObservation) as e: logger.debug(e) time.sleep(5) @@ -169,12 +185,12 @@ def __call__(self, *, raw: Optional[bool] = None): logger.debug(e) self.serial.reset_input_buffer() else: - yield RawData(obs.time, buffer) if raw else obs + yield reading.raw_data if raw else reading.obs_data sample += 1 if self.samples is not None and sample >= self.samples: break if self.interval: - delay = self.interval - (time.time() - obs.time) + delay = self.interval - (time.time() - reading.time) if delay > 0: time.sleep(delay) except KeyboardInterrupt: # pragma: no cover @@ -205,7 +221,9 @@ def __call__(self, *, raw: Optional[bool] = None): for row in self.data: time, message = int(row["time"]), bytes.fromhex(row["hex"]) - yield RawData(time, message) if raw else self.sensor.decode(message, time=time) + obs = self.sensor.decode(message, time=time) + reading = Reading(buffer=message, obs_data=obs) + yield reading.raw_data if raw else reading.obs_data if self.samples: self.samples -= 1 if self.samples <= 0: From 2d2bd47348d55c2a5edd4264c71a8266f2ed8bb5 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sun, 6 Nov 2022 11:10:33 +0000 Subject: [PATCH 2/7] Separate reading single values from all values This is part of adapting the reader module to support its use as a library. Previously it was only possible to read *all* the data in one go, which meant that error handling was hard to customise. SensorReader.read_one uses StopIteration to indicate there are no more readings. The choice of exception is to be consistent with the MessageReader, but a custom one could potentially be used. --- src/pms/core/reader.py | 64 ++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 28d3877..66000c2 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -134,6 +134,24 @@ def _pre_heat(self): # only pre-heat the firs time self.pre_heat = 0 + def read_one(self) -> Reading: + """Return a single passive mode reading""" + + if not self.serial.is_open: + raise StopIteration + + buffer = self._cmd("passive_read") + + try: + obs = self.sensor.decode(buffer) + return Reading(buffer=buffer, obs_data=obs) + except (SensorWarmingUp, InconsistentObservation) as e: + # no special hardware handling + raise + except SensorWarning as e: # pragma: no cover + self.serial.reset_input_buffer() + raise + def __enter__(self) -> "SensorReader": """Open serial port and sensor setup""" if not self.serial.is_open: @@ -171,19 +189,15 @@ def __call__(self, *, raw: Optional[bool] = None): """Passive mode reading at regular intervals""" sample = 0 - while self.serial.is_open: - try: - buffer = self._cmd("passive_read") - + try: + while True: try: - obs = self.sensor.decode(buffer) - reading = Reading(buffer=buffer, obs_data=obs) + reading = self.read_one() except (SensorWarmingUp, InconsistentObservation) as e: logger.debug(e) time.sleep(5) except SensorWarning as e: # pragma: no cover logger.debug(e) - self.serial.reset_input_buffer() else: yield reading.raw_data if raw else reading.obs_data sample += 1 @@ -193,9 +207,10 @@ def __call__(self, *, raw: Optional[bool] = None): delay = self.interval - (time.time() - reading.time) if delay > 0: time.sleep(delay) - except KeyboardInterrupt: # pragma: no cover - print() - break + except KeyboardInterrupt: # pragma: no cover + print() + except StopIteration: + pass class MessageReader(Reader): @@ -204,6 +219,15 @@ def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> self.sensor = sensor self.samples = samples + def read_one(self) -> Reading: + if not hasattr(self, "data"): + raise StopIteration + + row = next(self.data) + time, message = int(row["time"]), bytes.fromhex(row["hex"]) + obs = self.sensor.decode(message, time=time) + return Reading(buffer=message, obs_data=obs) + def __enter__(self) -> "MessageReader": logger.debug(f"open {self.path}") self.csv = self.path.open() @@ -216,19 +240,17 @@ def __exit__(self, exception_type, exception_value, traceback) -> None: self.csv.close() def __call__(self, *, raw: Optional[bool] = None): - if not hasattr(self, "data"): + try: + while True: + reading = self.read_one() + yield reading.raw_data if raw else reading.obs_data + if self.samples: + self.samples -= 1 + if self.samples <= 0: + break + except StopIteration: return - for row in self.data: - time, message = int(row["time"]), bytes.fromhex(row["hex"]) - obs = self.sensor.decode(message, time=time) - reading = Reading(buffer=message, obs_data=obs) - yield reading.raw_data if raw else reading.obs_data - if self.samples: - self.samples -= 1 - if self.samples <= 0: - break - @contextmanager def exit_on_fail(reader: Reader): From 6343ca06c8488c43ca68cb1b70331e9d7636b776 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sun, 20 Nov 2022 21:59:44 +0000 Subject: [PATCH 3/7] Add test to cover sensor reader temporary failure This will give us more confidence when we refactor these exception handlers in the next commits. --- src/pms/core/reader.py | 2 +- tests/core/test_reader.py | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 66000c2..20f7ea8 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -196,7 +196,7 @@ def __call__(self, *, raw: Optional[bool] = None): except (SensorWarmingUp, InconsistentObservation) as e: logger.debug(e) time.sleep(5) - except SensorWarning as e: # pragma: no cover + except SensorWarning as e: logger.debug(e) else: yield reading.raw_data if raw else reading.obs_data diff --git a/tests/core/test_reader.py b/tests/core/test_reader.py index 34d9dbf..e9ef886 100644 --- a/tests/core/test_reader.py +++ b/tests/core/test_reader.py @@ -107,6 +107,31 @@ def passive_read(n): ) +@pytest.fixture +def mock_sensor_temp_failure(mock_serial): + def passive_read(n): + if n == 1: + # first return garbage data (bad checksum) + return ( + b"BM\x00\x1c" # expected header + + b"\0" * 26 # payload (to total 32 bytes) + + b"\x00\xFF" # checksum + ) + else: + # then behave like the original stub again + return ( + b"BM\x00\x1c" # expected header + + b".........................." # payload + + b"\x05W" # checksum + ) + + mock_serial.stub( + name="passive_read", + receive_bytes=b"BM\xe2\x00\x00\x01q", + send_fn=passive_read, + ) + + @pytest.fixture def sensor_reader_factory(monkeypatch, mock_sensor): def factory( @@ -204,6 +229,23 @@ def test_sensor_reader_warm_up( assert len(obs) == 1 +def test_sensor_reader_temp_failure( + mock_sensor, + sensor_reader_factory, + mock_sensor_temp_failure, +): + sensor_reader = sensor_reader_factory() + + with sensor_reader as r: + obs = list(r()) + + # check one sample still acquired + assert len(obs) == 1 + + # check two samples were attempted + assert mock_sensor.stubs["passive_read"].calls == 2 + + def test_sensor_reader_sensor_mismatch(mock_sensor, sensor_reader_factory): sensor_reader = sensor_reader_factory() From 7cca8f99a58b5ac544b9896284ee3857e3694783 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sat, 19 Nov 2022 22:19:32 +0000 Subject: [PATCH 4/7] Simplify exception handling for SensorReader This clarifies that both exceptions relate to a temporary condition where it makes sense to wait before trying again. --- src/pms/__init__.py | 10 ++++++++-- src/pms/core/reader.py | 6 +++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/pms/__init__.py b/src/pms/__init__.py index 4549e10..cd8635f 100644 --- a/src/pms/__init__.py +++ b/src/pms/__init__.py @@ -21,13 +21,19 @@ class WrongMessageChecksum(SensorWarning): pass -class SensorWarmingUp(SensorWarning): +class SensorNotReady(SensorWarning): + """Sensor not ready to read: observations not reliable""" + + pass + + +class SensorWarmingUp(SensorNotReady): """Empty message: throw away observation and wait until sensor warms up""" pass -class InconsistentObservation(SensorWarning): +class InconsistentObservation(SensorNotReady): """Message payload makes no sense: throw away observation""" pass diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 20f7ea8..dd0d3b3 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -19,7 +19,7 @@ from serial import Serial from typer import progressbar -from pms import InconsistentObservation, SensorWarmingUp, SensorWarning, logger +from pms import SensorNotReady, SensorWarning, logger from pms.core import Sensor, Supported from .types import ObsData @@ -145,7 +145,7 @@ def read_one(self) -> Reading: try: obs = self.sensor.decode(buffer) return Reading(buffer=buffer, obs_data=obs) - except (SensorWarmingUp, InconsistentObservation) as e: + except SensorNotReady as e: # no special hardware handling raise except SensorWarning as e: # pragma: no cover @@ -193,7 +193,7 @@ def __call__(self, *, raw: Optional[bool] = None): while True: try: reading = self.read_one() - except (SensorWarmingUp, InconsistentObservation) as e: + except SensorNotReady as e: logger.debug(e) time.sleep(5) except SensorWarning as e: From 05f2992e1eae0bd5ae1339a945fc2e3876a460b9 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sun, 6 Nov 2022 11:32:49 +0000 Subject: [PATCH 5/7] Change readers to be wrapper classes of streams This makes it easier for a library to implement its own polling scheme. Streams are lower level than Readers, hence the granular Stream.open and Stream.close API instead of a context manager. Implementation notes: - Reinstating a type hint for __enter__ of "-> Self" will become possible in Python 3.11. - SensorReader.sensor property is necessary as it's used outside the class by the CLI [^1]. [^1]: https://github.com/benthorner/PyPMS/blob/f8d6d59a514aca757c0f8dd7d0f5b4b9b84fa2d5/src/pms/cli.py#L60 --- src/pms/core/__init__.py | 9 ++- src/pms/core/reader.py | 114 ++++++++++++++++++++++++++++---------- tests/conftest.py | 8 +-- tests/core/test_reader.py | 4 +- 4 files changed, 100 insertions(+), 35 deletions(-) diff --git a/src/pms/core/__init__.py b/src/pms/core/__init__.py index 10a9dfa..b1499c7 100644 --- a/src/pms/core/__init__.py +++ b/src/pms/core/__init__.py @@ -1,2 +1,9 @@ from .sensor import Sensor, Supported # isort: skip -from .reader import MessageReader, SensorReader, UnableToRead, exit_on_fail +from .reader import ( + MessageReader, + MessageStream, + SensorReader, + SensorStream, + UnableToRead, + exit_on_fail, +) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index dd0d3b3..5eb2186 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -9,7 +9,7 @@ import sys import time from abc import abstractmethod -from contextlib import AbstractContextManager, contextmanager +from contextlib import contextmanager from csv import DictReader from dataclasses import dataclass from pathlib import Path @@ -65,7 +65,28 @@ def time(self) -> int: return self.obs_data.time -class Reader(AbstractContextManager): +class Stream: + """ + Standard interface to read data. + """ + + @abstractmethod + def read(self) -> Reading: + ... + + @abstractmethod + def open(self) -> None: + ... + + @abstractmethod + def close(self) -> None: + ... + + +class Reader: + def __init__(self, *, stream: Stream) -> None: + self.stream = stream + @abstractmethod def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]: """ @@ -75,12 +96,16 @@ def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]: """ ... + def __enter__(self): + self.stream.open() + return self -class SensorReader(Reader): - """Read sensor messages from serial port + def __exit__(self, *args) -> None: + self.stream.close() - The sensor is woken up after opening the serial port, and put to sleep when before closing the port. - While the serial port is open, the sensor is read in passive mode. + +class SensorStream(Stream): + """Read sensor messages from serial port PMS3003 sensors do not accept serial commands, such as wake/sleep or passive mode read. Valid messages are extracted from the serial buffer. @@ -88,10 +113,9 @@ class SensorReader(Reader): def __init__( self, + *, sensor: Union[Sensor, Supported, str] = Supported.default, port: str = "/dev/ttyUSB0", - interval: Optional[int] = None, - samples: Optional[int] = None, timeout: Optional[float] = None, ) -> None: """Configure serial port""" @@ -101,12 +125,6 @@ def __init__( self.serial.port = port self.serial.baudrate = self.sensor.baud self.serial.timeout = timeout or 5 # max time to wake up sensor - self.interval = interval - self.samples = samples - logger.debug( - f"capture {samples if samples else '?'} {sensor} obs " - f"from {port} every {interval if interval else '?'} secs" - ) def _cmd(self, command: str) -> bytes: """Write command to sensor and return answer""" @@ -134,7 +152,7 @@ def _pre_heat(self): # only pre-heat the firs time self.pre_heat = 0 - def read_one(self) -> Reading: + def read(self) -> Reading: """Return a single passive mode reading""" if not self.serial.is_open: @@ -152,7 +170,7 @@ def read_one(self) -> Reading: self.serial.reset_input_buffer() raise - def __enter__(self) -> "SensorReader": + def open(self) -> None: """Open serial port and sensor setup""" if not self.serial.is_open: logger.debug(f"open {self.serial.port}") @@ -176,15 +194,48 @@ def __enter__(self) -> "SensorReader": logger.error(f"Sensor is not {self.sensor.name}") raise UnableToRead("Sensor failed validation") - return self - - def __exit__(self, exception_type, exception_value, traceback) -> None: + def close(self) -> None: """Put sensor to sleep and close serial port""" logger.debug(f"sleep {self.sensor}") buffer = self._cmd("sleep") logger.debug(f"close {self.serial.port}") self.serial.close() + +class SensorReader(Reader): + """Read sensor messages from serial port + + The sensor is woken up after opening the serial port, and put to sleep when before closing the port. + While the serial port is open, the sensor is read in passive mode. + """ + + def __init__( + self, + sensor: Union[Sensor, Supported, str] = Supported.default, + port: str = "/dev/ttyUSB0", + interval: Optional[int] = None, + samples: Optional[int] = None, + timeout: Optional[float] = None, + ) -> None: + super().__init__( + stream=SensorStream( + sensor=sensor, + port=port, + timeout=timeout, + ) + ) + + self.interval = interval + self.samples = samples + logger.debug( + f"capture {samples if samples else '?'} {sensor} obs " + f"from {port} every {interval if interval else '?'} secs" + ) + + @property + def sensor(self): + return self.stream.sensor + def __call__(self, *, raw: Optional[bool] = None): """Passive mode reading at regular intervals""" @@ -192,7 +243,7 @@ def __call__(self, *, raw: Optional[bool] = None): try: while True: try: - reading = self.read_one() + reading = self.stream.read() except SensorNotReady as e: logger.debug(e) time.sleep(5) @@ -213,13 +264,12 @@ def __call__(self, *, raw: Optional[bool] = None): pass -class MessageReader(Reader): - def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None: +class MessageStream(Stream): + def __init__(self, *, path: Path, sensor: Sensor) -> None: self.path = path self.sensor = sensor - self.samples = samples - def read_one(self) -> Reading: + def read(self) -> Reading: if not hasattr(self, "data"): raise StopIteration @@ -228,21 +278,29 @@ def read_one(self) -> Reading: obs = self.sensor.decode(message, time=time) return Reading(buffer=message, obs_data=obs) - def __enter__(self) -> "MessageReader": + def open(self) -> None: logger.debug(f"open {self.path}") self.csv = self.path.open() reader = DictReader(self.csv) self.data = (row for row in reader if row["sensor"] == self.sensor.name) - return self - def __exit__(self, exception_type, exception_value, traceback) -> None: + def close(self) -> None: logger.debug(f"close {self.path}") self.csv.close() + +class MessageReader(Reader): + def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None: + super().__init__( + stream=MessageStream(path=path, sensor=sensor), + ) + + self.samples = samples + def __call__(self, *, raw: Optional[bool] = None): try: while True: - reading = self.read_one() + reading = self.stream.read() yield reading.raw_data if raw else reading.obs_data if self.samples: self.samples -= 1 diff --git a/tests/conftest.py b/tests/conftest.py index d5328bf..484f4cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -108,7 +108,7 @@ def capture_data(request) -> CapturedData: @pytest.fixture() def capture(monkeypatch, capture_data) -> CapturedData: - """mock pms.core.reader.Serial and some pms.core.reader.SensorReader internals""" + """mock pms.core.reader internals""" class MockSerial: port = None @@ -130,7 +130,7 @@ def reset_input_buffer(self): sensor = capture_data.sensor data = capture_data.data - def mock_reader__cmd(self, command: str) -> bytes: + def mock_stream__cmd(self, command: str) -> bytes: """bypass serial.write/read""" logger.debug(f"mock write/read: {command}") # nonlocal data @@ -141,12 +141,12 @@ def mock_reader__cmd(self, command: str) -> bytes: return b"" - monkeypatch.setattr("pms.core.reader.SensorReader._cmd", mock_reader__cmd) + monkeypatch.setattr("pms.core.reader.SensorStream._cmd", mock_stream__cmd) def mock_reader__pre_heat(self): pass - monkeypatch.setattr("pms.core.reader.SensorReader._pre_heat", mock_reader__pre_heat) + monkeypatch.setattr("pms.core.reader.SensorStream._pre_heat", mock_reader__pre_heat) def mock_sensor_check(self, buffer: bytes, command: str) -> bool: """don't check if message matches sensor""" diff --git a/tests/core/test_reader.py b/tests/core/test_reader.py index e9ef886..2a849b5 100644 --- a/tests/core/test_reader.py +++ b/tests/core/test_reader.py @@ -150,7 +150,7 @@ def factory( # https://github.com/pyserial/pyserial/issues/625 monkeypatch.setattr( - sensor_reader.serial, + sensor_reader.stream.serial, "flush", lambda: None, ) @@ -204,7 +204,7 @@ def test_sensor_reader_preheat(sensor_reader_factory, mock_sleep): sensor_reader = sensor_reader_factory() # override pre heat duration - sensor_reader.pre_heat = 5 + sensor_reader.stream.pre_heat = 5 with sensor_reader as r: pass From aa575484171bf9d672715aabd78c9e1c07601dca Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sat, 19 Nov 2022 22:14:53 +0000 Subject: [PATCH 6/7] Add library documentation about Stream API usage --- docs/examples/sensor_stream.out | 6 ++++++ docs/examples/sensor_stream.py | 17 +++++++++++++++++ docs/library_usage.md | 10 ++++++++++ 3 files changed, 33 insertions(+) create mode 100644 docs/examples/sensor_stream.out create mode 100644 docs/examples/sensor_stream.py diff --git a/docs/examples/sensor_stream.out b/docs/examples/sensor_stream.out new file mode 100644 index 0000000..032406c --- /dev/null +++ b/docs/examples/sensor_stream.out @@ -0,0 +1,6 @@ + +PMSx003 4 samples on default format +2021-07-29 15:57:01: PM1 1.0, PM2.5 11.0, PM10 12.0 μg/m3 +2021-07-29 15:57:21: PM1 0.0, PM2.5 6.0, PM10 6.0 μg/m3 +2021-07-29 15:57:41: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3 +2021-07-29 15:58:01: PM1 0.0, PM2.5 1.0, PM10 2.0 μg/m3 diff --git a/docs/examples/sensor_stream.py b/docs/examples/sensor_stream.py new file mode 100644 index 0000000..2fc8219 --- /dev/null +++ b/docs/examples/sensor_stream.py @@ -0,0 +1,17 @@ +""" +Read PMSx003 sensor on /dev/ttyUSB0. + +Use a low-level Stream class for more granular +control of the sensor. +""" + +from pms.core import SensorStream + +stream = SensorStream(sensor="PMSx003", port="/dev/ttyUSB0") + +print("\nPMSx003 4 samples on default format") + +stream.open() +for _ in range(4): + print(stream.read()) +stream.close() diff --git a/docs/library_usage.md b/docs/library_usage.md index 7efc3a1..e9a7974 100644 --- a/docs/library_usage.md +++ b/docs/library_usage.md @@ -27,6 +27,16 @@ This section contain some help for those brave enough to use its internals as a --8<-- "read_two_sensors.out" ``` +=== "sensor stream (low-level API)" + + ``` python + --8<-- "sensor_stream.py" + ``` + + ``` + --8<-- "sensor_stream.out" + ``` + ## Observation data fields From 4db6fb45fd5421f74861b42ce43bba4b963b2905 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Sat, 19 Nov 2022 22:34:11 +0000 Subject: [PATCH 7/7] DRY-up Reader.__call__ using custom exceptions This takes advantage of the new Stream classes to move towards a single Reader class: SensorReader and MessageReader are now largely the same except for the stream they use. Using a common definition of Reader has the advantage of keeping their behaviour consistent. Implementation notes: - The sample counting code looks different between the two readers but actually behaves the same way except that MessageReader would only count once - at the class level. - The "interval" default of "None" means MessageReader will continue to behave as it did before, although in future it could of course make use of this feature if needed. - The two new "Stream" exceptions represent a layer of abstraction that makes it possible for Reader.__call__ to make generic decisions about Streams, rather than just sensors. --- src/pms/core/reader.py | 99 ++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/src/pms/core/reader.py b/src/pms/core/reader.py index 5eb2186..d7239b2 100644 --- a/src/pms/core/reader.py +++ b/src/pms/core/reader.py @@ -34,6 +34,14 @@ class UnableToRead(Exception): pass +class StreamNotReady(Exception): + pass + + +class TemporaryFailure(Exception): + pass + + class RawData(NamedTuple): """raw messages with timestamp""" @@ -84,17 +92,47 @@ def close(self) -> None: class Reader: - def __init__(self, *, stream: Stream) -> None: + def __init__( + self, + *, + stream: Stream, + interval: Optional[int] = None, + samples: Optional[int] = None, + ) -> None: + self.interval = interval + self.samples = samples self.stream = stream - @abstractmethod - def __call__(self, *, raw: Optional[bool]) -> Iterator[Union[RawData, ObsData]]: + def __call__(self, *, raw: Optional[bool] = None) -> Iterator[Union[RawData, ObsData]]: """ Return an iterator of ObsData. If "raw" is set to True, then ObsData is replaced with RawData. """ - ... + + sample = 0 + try: + while True: + try: + reading = self.stream.read() + except StreamNotReady as e: + logger.debug(e) + time.sleep(5) + except TemporaryFailure as e: + logger.debug(e) + else: + yield reading.raw_data if raw else reading.obs_data + sample += 1 + if self.samples is not None and sample >= self.samples: + break + if self.interval: + delay = self.interval - (time.time() - reading.time) + if delay > 0: + time.sleep(delay) + except KeyboardInterrupt: # pragma: no cover + print() + except StopIteration: + pass def __enter__(self): self.stream.open() @@ -165,10 +203,10 @@ def read(self) -> Reading: return Reading(buffer=buffer, obs_data=obs) except SensorNotReady as e: # no special hardware handling - raise + raise StreamNotReady except SensorWarning as e: # pragma: no cover self.serial.reset_input_buffer() - raise + raise TemporaryFailure def open(self) -> None: """Open serial port and sensor setup""" @@ -222,11 +260,10 @@ def __init__( sensor=sensor, port=port, timeout=timeout, - ) + ), + samples=samples, + interval=interval, ) - - self.interval = interval - self.samples = samples logger.debug( f"capture {samples if samples else '?'} {sensor} obs " f"from {port} every {interval if interval else '?'} secs" @@ -236,33 +273,6 @@ def __init__( def sensor(self): return self.stream.sensor - def __call__(self, *, raw: Optional[bool] = None): - """Passive mode reading at regular intervals""" - - sample = 0 - try: - while True: - try: - reading = self.stream.read() - except SensorNotReady as e: - logger.debug(e) - time.sleep(5) - except SensorWarning as e: - logger.debug(e) - else: - yield reading.raw_data if raw else reading.obs_data - sample += 1 - if self.samples is not None and sample >= self.samples: - break - if self.interval: - delay = self.interval - (time.time() - reading.time) - if delay > 0: - time.sleep(delay) - except KeyboardInterrupt: # pragma: no cover - print() - except StopIteration: - pass - class MessageStream(Stream): def __init__(self, *, path: Path, sensor: Sensor) -> None: @@ -293,22 +303,9 @@ class MessageReader(Reader): def __init__(self, path: Path, sensor: Sensor, samples: Optional[int] = None) -> None: super().__init__( stream=MessageStream(path=path, sensor=sensor), + samples=samples, ) - self.samples = samples - - def __call__(self, *, raw: Optional[bool] = None): - try: - while True: - reading = self.stream.read() - yield reading.raw_data if raw else reading.obs_data - if self.samples: - self.samples -= 1 - if self.samples <= 0: - break - except StopIteration: - return - @contextmanager def exit_on_fail(reader: Reader):