Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ class ASCReader(BaseIOHandler):
"""
Iterator of CAN messages from a ASC logging file. Meta data (comments,
bus statistics, J1939 Transport Protocol messages) is ignored.

TODO: turn relative timestamps back to absolute form
"""

FORMAT_START_OF_FILE_DATE = "%a %b %d %I:%M:%S.%f %p %Y"

def __init__(
self,
file: Union[typechecking.FileLike, typechecking.StringPathLike],
base: str = "hex",
relative_timestamp: bool = True,
) -> None:
"""
:param file: a path-like object or as file-like object to read from
Expand All @@ -47,14 +48,19 @@ def __init__(
:param base: Select the base(hex or dec) of id and data.
If the header of the asc file contains base information,
this value will be overwritten. Default "hex".
:param relative_timestamp: Select whether the timestamps are
`relative` (starting at 0.0) or `absolute` (starting at
the system time). Default `True = relative`.
"""
super().__init__(file, mode="r")

if not self.file:
raise ValueError("The given file cannot be None")
self.base = base
self._converted_base = self._check_base(base)
self.relative_timestamp = relative_timestamp
self.date = None
# TODO - what is this used for? The ASC Writer only prints `absolute`
self.timestamps_format = None
self.internal_events_logged = None

Expand All @@ -74,6 +80,22 @@ def _extract_header(self):
self.timestamps_format = timestamp_format
elif lower_case.endswith("internal events logged"):
self.internal_events_logged = not lower_case.startswith("no")
elif lower_case.startswith("// version"):
# the test files include `// version 9.0.0` - not sure what this is
continue
# grab absolute timestamp
elif lower_case.startswith("begin triggerblock"):
try:
_, _, start_time = lower_case.split(None, 2)
start_time = datetime.strptime(
start_time, self.FORMAT_START_OF_FILE_DATE
).timestamp()
except ValueError:
start_time = 0.0
if self.relative_timestamp:
self.start_time = 0.0
else:
self.start_time = start_time
# Currently the last line in the header which is parsed
break
else:
Expand Down Expand Up @@ -191,7 +213,7 @@ def __iter__(self) -> Generator[Message, None, None]:
msg_kwargs = {}
try:
timestamp, channel, rest_of_message = temp.split(None, 2)
timestamp = float(timestamp)
timestamp = float(timestamp) + self.start_time
msg_kwargs["timestamp"] = timestamp
if channel == "CANFD":
msg_kwargs["is_fd"] = True
Expand Down
2 changes: 1 addition & 1 deletion test/data/test_CanMessage.asc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ date Sam Sep 30 15:06:13.191 2017
base hex timestamps absolute
internal events logged
// version 9.0.0
Begin Triggerblock Sam Sep 30 15:06:13.191 2017
Begin Triggerblock Sat Sep 30 10:06:13.191 PM 2017
0.000000 Start of measurement
2.5010 2 C8 Tx d 8 09 08 07 06 05 04 03 02
17.876708 1 6F9 Rx d 8 05 0C 00 00 00 00 00 00 Length = 240015 BitCount = 124 ID = 1785
Expand Down
35 changes: 33 additions & 2 deletions test/logformats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
from abc import abstractmethod, ABCMeta
from itertools import zip_longest
from datetime import datetime

import can

Expand Down Expand Up @@ -364,6 +365,8 @@ def assertIncludesComments(self, filename):
class TestAscFileFormat(ReaderWriterTest):
"""Tests can.ASCWriter and can.ASCReader"""

FORMAT_START_OF_FILE_DATE = "%a %b %d %I:%M:%S.%f %p %Y"

def _setup_instance(self):
super()._setup_instance_helper(
can.ASCWriter,
Expand All @@ -374,11 +377,39 @@ def _setup_instance(self):
adds_default_channel=0,
)

def _read_log_file(self, filename):
def _read_log_file(self, filename, **kwargs):
logfile = os.path.join(os.path.dirname(__file__), "data", filename)
with can.ASCReader(logfile) as reader:
with can.ASCReader(logfile, **kwargs) as reader:
return list(reader)

def test_absolute_time(self):
time_from_file = "Sat Sep 30 10:06:13.191 PM 2017"
start_time = datetime.strptime(
time_from_file, self.FORMAT_START_OF_FILE_DATE
).timestamp()

expected_messages = [
can.Message(
timestamp=2.5010 + start_time,
arbitration_id=0xC8,
is_extended_id=False,
is_rx=False,
channel=1,
dlc=8,
data=[9, 8, 7, 6, 5, 4, 3, 2],
),
can.Message(
timestamp=17.876708 + start_time,
arbitration_id=0x6F9,
is_extended_id=False,
channel=0,
dlc=0x8,
data=[5, 0xC, 0, 0, 0, 0, 0, 0],
),
]
actual = self._read_log_file("test_CanMessage.asc", relative_timestamp=False)
self.assertMessagesEqual(actual, expected_messages)

def test_can_message(self):
expected_messages = [
can.Message(
Expand Down