From 8111404bc7db16a6ffdd75a3c02c41f52988007c Mon Sep 17 00:00:00 2001 From: Ian Flournoy Date: Fri, 16 Apr 2021 20:09:56 -0400 Subject: [PATCH 1/4] Add initial version of command library and basic nceserial driver commands --- bdio/brain/brain.py | 13 ++++ bdio/brain/nceserial/__init__.py | 0 bdio/brain/nceserial/commands.csv | 30 +++++++++ bdio/brain/nceserial/commands.py | 25 +++++++ bdio/brain/nceserial/driver.py | 67 +++++++++++++++++++ .../{nceserial_test.py => driver_test.py} | 4 +- bdio/brain/nceserial/nceserial.py | 37 ---------- 7 files changed, 137 insertions(+), 39 deletions(-) create mode 100644 bdio/brain/nceserial/__init__.py create mode 100644 bdio/brain/nceserial/commands.csv create mode 100644 bdio/brain/nceserial/commands.py create mode 100644 bdio/brain/nceserial/driver.py rename bdio/brain/nceserial/{nceserial_test.py => driver_test.py} (62%) delete mode 100644 bdio/brain/nceserial/nceserial.py diff --git a/bdio/brain/brain.py b/bdio/brain/brain.py index c9aed7c..cbedc61 100644 --- a/bdio/brain/brain.py +++ b/bdio/brain/brain.py @@ -1,4 +1,17 @@ +from nceserial.driver import NceSerialDriver +from time import sleep + """RITMRC Layout Control and Signaling: Brain""" if __name__ == "__main__": # is this required for an app entrypoint? print("All aboard!") # Hello, world! + cmd_station = NceSerialDriver() + cmd_station.connect("/dev/ttyUSB0") + if cmd_station.is_connected(): + print("Software version: " + cmd_station.version_formatted()) + print("Time: " + cmd_station.read_clock_formatted()) + while True: + print("time is now " + cmd_station.read_clock_formatted()) + sleep(1) + else: + print("not connected") diff --git a/bdio/brain/nceserial/__init__.py b/bdio/brain/nceserial/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bdio/brain/nceserial/commands.csv b/bdio/brain/nceserial/commands.csv new file mode 100644 index 0000000..8a9b436 --- /dev/null +++ b/bdio/brain/nceserial/commands.csv @@ -0,0 +1,30 @@ +command,name,arg_size_bytes,res_size_bytes,res_options +80,noop ,0,1,! +81,assign_loco ,3,1,!|1|2 +82,clock_read ,0,2, +83,clock_stop ,0,1,! +84,clock_start ,0,1,! +85,clock_set ,2,1,!|3 +86,clock_swap_12_24 ,0,1,!|3 +87,clock_set_ratio ,1,1,!|3 +88,loco_dequeue_packet ,2,1,!|1|2 +89,use_main_track ,0,1,! +8A,aiu_status ,1,4, +8B,use_prog_track ,0,1,! +8C,dummy ,0,3,!|0D|0A +8D,loco_set_speed ,3,1,!|1|3|14|28|128 +8E,write_16 ,19,1,!|4 +8F,read_16 ,2,16, +# +97,write_16 ,3,1,! +98,write_2 ,4,1,! +99,write_4 ,6,1,! +9A,write_8 ,10,1,! +9B,aiu_status_short ,1,2, +9C,run_macro ,1,1,!|0|3 +9D,read_1 ,1,1, +9E,enter_prog_track_mode,0,1,!|3 +9F,exit_prog_track_mode ,0,1,! +# +AA,version ,0,3, +AB,soft_reset ,0,0, \ No newline at end of file diff --git a/bdio/brain/nceserial/commands.py b/bdio/brain/nceserial/commands.py new file mode 100644 index 0000000..56a61ab --- /dev/null +++ b/bdio/brain/nceserial/commands.py @@ -0,0 +1,25 @@ +import csv +import os + +commands = {} + +this_dir, this_filename = os.path.split(__file__) +commands_file = os.path.join(this_dir, "commands.csv") + +with open(commands_file) as csvfile: + reader = csv.reader(csvfile) + for row in reader: + if row[0] == "#": + continue + cmd, name, arg_size, res_size, res_options = row + if name == "name": + continue + name_upper = str.upper(str.rstrip(name)) + commands[name_upper] = { + "cmd": bytes.fromhex(cmd), + "arg_size": int(arg_size), + "res_size": int(res_size), + "responses": None, + } + if res_options: + commands[name_upper]["responses"] = res_options.split("|") diff --git a/bdio/brain/nceserial/driver.py b/bdio/brain/nceserial/driver.py new file mode 100644 index 0000000..13a0597 --- /dev/null +++ b/bdio/brain/nceserial/driver.py @@ -0,0 +1,67 @@ +import serial +from .commands import commands + +NCE_BAUD = 9600 +NCE_BYTESIZE = serial.EIGHTBITS +NCE_PARITY = serial.PARITY_NONE +NCE_STOPBITS = serial.STOPBITS_ONE +TIMEOUT_SEC = 1 + + +class NceSerialDriver: + """NCE Command Station Serial Interface Driver""" + + def __init__(self): + self.serial = serial.Serial( + baudrate=NCE_BAUD, + parity=NCE_PARITY, + stopbits=NCE_STOPBITS, + timeout=TIMEOUT_SEC, + ) + + def is_connected(self): + return self.serial.is_open and self.noop() + + def connect(self, port): + if self.is_connected(): + raise serial.SerialException("NCE Serial already connected") + if port: + self.serial.port = port + self.serial.open() + + def disconnect(self): + if not self.is_connected(): + raise serial.SerialException("NCE Serial not connected") + self.serial.close() + + def write(self, data): + self.serial.write(data) + + def read(self, bytecount): + return self.serial.read(bytecount) + + def run_command(self, command_name): + command = commands[command_name] + self.write(command["cmd"]) + return self.read(command["res_size"]) + + def noop(self): + return self.run_command("NOOP") + + def dummy(self): + return self.run_command("DUMMY") + + def read_clock(self): + """Read command station clock setting""" + return self.run_command("CLOCK_READ") + + def read_clock_formatted(self): + hour, minute = self.read_clock() + return f"{hour}:{minute}" + + def version(self): + return self.run_command("VERSION") + + def version_formatted(self): + VV, MM, mm = self.version() + return f"v{VV}.{MM}.{mm}" diff --git a/bdio/brain/nceserial/nceserial_test.py b/bdio/brain/nceserial/driver_test.py similarity index 62% rename from bdio/brain/nceserial/nceserial_test.py rename to bdio/brain/nceserial/driver_test.py index c40721d..7302433 100644 --- a/bdio/brain/nceserial/nceserial_test.py +++ b/bdio/brain/nceserial/driver_test.py @@ -1,7 +1,7 @@ from serial import Serial -from nceserial import NceSerial +from driver import NceSerialDriver def test_NceSerial_construction(): - nce = NceSerial() + nce = NceSerialDriver() assert isinstance(nce.serial, Serial) diff --git a/bdio/brain/nceserial/nceserial.py b/bdio/brain/nceserial/nceserial.py deleted file mode 100644 index a919c84..0000000 --- a/bdio/brain/nceserial/nceserial.py +++ /dev/null @@ -1,37 +0,0 @@ -import serial - -NCE_BAUD = 9600 -NCE_BYTESIZE = serial.EIGHTBITS -NCE_PARITY = serial.PARITY_NONE -NCE_STOPBITS = serial.STOPBITS_ONE -TIMEOUT_SEC = 1 - - -class NceSerial: - """NCE Command Station Serial Interface Driver""" - - def __init__(self): - self.serial = serial.Serial( - baudrate=NCE_BAUD, - parity=NCE_PARITY, - stopbits=NCE_STOPBITS, - timeout=TIMEOUT_SEC, - ) - - def is_connected(self): - return self.serial.is_open() - - def connect(self, port): - if self.is_connected(): - raise serial.SerialException("NCE Serial already connected") - if port: - self.serial.port = port - self.serial.open() - - def disconnect(self): - if not self.is_connected(): - raise serial.SerialException("NCE Serial not connected") - self.serial.close() - - def write(self, bytes): - return self From bdce2cd20c94445e2a42834cdcac4b3b6866fd8b Mon Sep 17 00:00:00 2001 From: Ian Flournoy Date: Sat, 17 Apr 2021 08:40:29 -0400 Subject: [PATCH 2/4] Add rope devdep for automated refactoring support --- Pipfile | 1 + Pipfile.lock | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/Pipfile b/Pipfile index a107174..619a3e9 100644 --- a/Pipfile +++ b/Pipfile @@ -14,6 +14,7 @@ pytest = "*" coverage = "*" pytest-cov = "*" pyclean = "*" +rope = "*" [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index f803e7d..74f502c 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e6545aa4fa1331a0fb84597868ae375627bd33f64474d29dc918eebc98e4a377" + "sha256": "ff095bf274584937237e9766467d68dd5824a44f338e7d5236965f0c28007908" }, "pipfile-spec": 6, "requires": { @@ -285,6 +285,13 @@ ], "version": "==2021.4.4" }, + "rope": { + "hashes": [ + "sha256:786b5c38c530d4846aa68a42604f61b4e69a493390e3ca11b88df0fbfdc3ed04" + ], + "index": "pypi", + "version": "==0.18.0" + }, "toml": { "hashes": [ "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", From 0a4c108a123e893b24612e3169c8a8d14d51674a Mon Sep 17 00:00:00 2001 From: Ian Flournoy Date: Sat, 17 Apr 2021 21:21:13 -0400 Subject: [PATCH 3/4] Add pytest-mock devdep --- Pipfile | 1 + Pipfile.lock | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Pipfile b/Pipfile index 619a3e9..d3b14ba 100644 --- a/Pipfile +++ b/Pipfile @@ -15,6 +15,7 @@ coverage = "*" pytest-cov = "*" pyclean = "*" rope = "*" +pytest-mock = "*" [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index 74f502c..3772b36 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "ff095bf274584937237e9766467d68dd5824a44f338e7d5236965f0c28007908" + "sha256": "1065c248d3077683811f618e3c38e722c899ed3b612e3d0ea099e9caa07813ba" }, "pipfile-spec": 6, "requires": { @@ -239,6 +239,14 @@ "index": "pypi", "version": "==2.11.1" }, + "pytest-mock": { + "hashes": [ + "sha256:379b391cfad22422ea2e252bdfc008edd08509029bcde3c25b2c0bd741e0424e", + "sha256:a1e2aba6af9560d313c642dae7e00a2a12b022b80301d9d7fc8ec6858e1dd9fc" + ], + "index": "pypi", + "version": "==3.5.1" + }, "regex": { "hashes": [ "sha256:01afaf2ec48e196ba91b37451aa353cb7eda77efe518e481707e0515025f0cd5", From 2a0604cae03179700e1e5b0e37bd77f975ba649c Mon Sep 17 00:00:00 2001 From: Ian Flournoy Date: Sat, 17 Apr 2021 21:21:50 -0400 Subject: [PATCH 4/4] WIP: aiu_status --- bdio/brain/brain.py | 15 ++++---- bdio/brain/nceserial/commander.py | 54 +++++++++++++++++++++++++++++ bdio/brain/nceserial/commands.csv | 2 +- bdio/brain/nceserial/commands.py | 42 +++++++++++++++------- bdio/brain/nceserial/driver.py | 39 ++++++--------------- bdio/brain/nceserial/driver_test.py | 29 ++++++++++++++-- bdio/brain/nceserial/errors.py | 9 +++++ bdio/brain/nceserial/util.py | 9 +++++ 8 files changed, 148 insertions(+), 51 deletions(-) create mode 100644 bdio/brain/nceserial/commander.py create mode 100644 bdio/brain/nceserial/errors.py create mode 100644 bdio/brain/nceserial/util.py diff --git a/bdio/brain/brain.py b/bdio/brain/brain.py index cbedc61..0a0baeb 100644 --- a/bdio/brain/brain.py +++ b/bdio/brain/brain.py @@ -1,17 +1,20 @@ from nceserial.driver import NceSerialDriver +from nceserial.commander import NceCommander from time import sleep """RITMRC Layout Control and Signaling: Brain""" if __name__ == "__main__": # is this required for an app entrypoint? print("All aboard!") # Hello, world! - cmd_station = NceSerialDriver() - cmd_station.connect("/dev/ttyUSB0") - if cmd_station.is_connected(): - print("Software version: " + cmd_station.version_formatted()) - print("Time: " + cmd_station.read_clock_formatted()) + driver = NceSerialDriver() + driver.connect("/dev/ttyUSB0") + cmd_station = NceCommander(driver) + if cmd_station.is_online(): + print("NCE Software version: " + cmd_station.version_formatted()) + print("Time: " + cmd_station.clock_formatted()) while True: - print("time is now " + cmd_station.read_clock_formatted()) + print(cmd_station.aiu_status(4)) sleep(1) else: print("not connected") + \ No newline at end of file diff --git a/bdio/brain/nceserial/commander.py b/bdio/brain/nceserial/commander.py new file mode 100644 index 0000000..05fbca9 --- /dev/null +++ b/bdio/brain/nceserial/commander.py @@ -0,0 +1,54 @@ +import struct +from .commands import COMMANDS, RESPONSE_VALUES +from .util import int2addr + +class NceCommander: + def __init__(self, driver): + self.driver = driver + + def is_online(self): + return self.driver.is_connected() and self.noop() + + def run_command(self, name): + try: + cmd, _, res_size, res_opts = COMMANDS[name] + except KeyError: + raise Exception(f"'{name}' is not a valid NCE command") + + result = self.driver.read_write(cmd, res_size) + if len(res_opts): + if result not in res_opts: + # TODO: map this better from error response code to + # an error class + raise Exception(f"response not in possible result set") + return result + + def clock(self): + """Read command station clock setting""" + return self.run_command("CLOCK_READ") + + def clock_formatted(self): + hour, minute = self.clock() + return f"{hour}:{minute}" + + def version(self): + return self.run_command("VERSION") + + def version_formatted(self): + VV, MM, mm = self.version() + return f"v{VV}.{MM}.{mm}" + + def noop(self): + return self.run_command("NOOP") + + def dummy(self): + return self.run_command("DUMMY") + + def aiu_status(self, address): + cmd = struct.pack(">BB", 0x8A, address) + result = self.driver.read_write(cmd, 4) + #return [byte >> i & 1 for i in range(8) for byte in result] + return ( + [int.from_bytes(result[0:2], byteorder='big') >> i & 1 for i in range(16)], + [int.from_bytes(result[2:4], byteorder='big') >> i & 1 for i in range(16)] + ) diff --git a/bdio/brain/nceserial/commands.csv b/bdio/brain/nceserial/commands.csv index 8a9b436..485a5f8 100644 --- a/bdio/brain/nceserial/commands.csv +++ b/bdio/brain/nceserial/commands.csv @@ -11,7 +11,7 @@ command,name,arg_size_bytes,res_size_bytes,res_options 89,use_main_track ,0,1,! 8A,aiu_status ,1,4, 8B,use_prog_track ,0,1,! -8C,dummy ,0,3,!|0D|0A +8C,dummy ,0,3,! 8D,loco_set_speed ,3,1,!|1|3|14|28|128 8E,write_16 ,19,1,!|4 8F,read_16 ,2,16, diff --git a/bdio/brain/nceserial/commands.py b/bdio/brain/nceserial/commands.py index 56a61ab..a751765 100644 --- a/bdio/brain/nceserial/commands.py +++ b/bdio/brain/nceserial/commands.py @@ -1,25 +1,43 @@ import csv import os +from pprint import pprint -commands = {} +COMMANDS = {} +RESPONSES = { + "SUCCESS": b"!", + "UNSUPPORTED_COMMAND": b"0", + "ADDRESS_RANGE_ERROR": b"1", + "CAB_OP_RANGE_ERROR": b"2", + "DATA_RANGE_ERROR": b"3", + "BYTE_COUNT_RANGE_ERROR": b"4", + "SPEED_MODE_1": b"14", + "SPEED_MODE_2": b"28", + "SPEED_MODE_3": b"128", +} +RESPONSE_VALUES = {v: k for k, v in RESPONSES.items()} this_dir, this_filename = os.path.split(__file__) commands_file = os.path.join(this_dir, "commands.csv") + +def unpack_opts(field): + if not field: + return [] + options = field.split("|") + return list(map(lambda o: bytes(o, "ascii"), options)) + + with open(commands_file) as csvfile: reader = csv.reader(csvfile) for row in reader: if row[0] == "#": continue - cmd, name, arg_size, res_size, res_options = row - if name == "name": + if row[1] == "name": continue - name_upper = str.upper(str.rstrip(name)) - commands[name_upper] = { - "cmd": bytes.fromhex(cmd), - "arg_size": int(arg_size), - "res_size": int(res_size), - "responses": None, - } - if res_options: - commands[name_upper]["responses"] = res_options.split("|") + cmd = bytes.fromhex(row[0]) + name = str.upper(str.rstrip(row[1])) + arg_size = int(row[2]) + res_size = int(row[3]) + res_opts = unpack_opts(row[4]) + + COMMANDS[name] = (cmd, arg_size, res_size, res_opts) \ No newline at end of file diff --git a/bdio/brain/nceserial/driver.py b/bdio/brain/nceserial/driver.py index 13a0597..845cabc 100644 --- a/bdio/brain/nceserial/driver.py +++ b/bdio/brain/nceserial/driver.py @@ -1,5 +1,4 @@ import serial -from .commands import commands NCE_BAUD = 9600 NCE_BYTESIZE = serial.EIGHTBITS @@ -20,7 +19,7 @@ def __init__(self): ) def is_connected(self): - return self.serial.is_open and self.noop() + return self.serial.is_open def connect(self, port): if self.is_connected(): @@ -35,33 +34,15 @@ def disconnect(self): self.serial.close() def write(self, data): - self.serial.write(data) + #print(f"driver: writing - {data}") + return self.serial.write(data) - def read(self, bytecount): - return self.serial.read(bytecount) + def read(self, size): + result = self.serial.read(size) - def run_command(self, command_name): - command = commands[command_name] - self.write(command["cmd"]) - return self.read(command["res_size"]) + #print(f"driver: read {size} bytes - {result.hex()}") + return result - def noop(self): - return self.run_command("NOOP") - - def dummy(self): - return self.run_command("DUMMY") - - def read_clock(self): - """Read command station clock setting""" - return self.run_command("CLOCK_READ") - - def read_clock_formatted(self): - hour, minute = self.read_clock() - return f"{hour}:{minute}" - - def version(self): - return self.run_command("VERSION") - - def version_formatted(self): - VV, MM, mm = self.version() - return f"v{VV}.{MM}.{mm}" + def read_write(self, data, size): + self.write(data) + return self.read(size) \ No newline at end of file diff --git a/bdio/brain/nceserial/driver_test.py b/bdio/brain/nceserial/driver_test.py index 7302433..ef26b58 100644 --- a/bdio/brain/nceserial/driver_test.py +++ b/bdio/brain/nceserial/driver_test.py @@ -1,7 +1,30 @@ -from serial import Serial -from driver import NceSerialDriver +import pytest +from serial import Serial, SerialException +from .driver import NceSerialDriver -def test_NceSerial_construction(): +def test_construction(): nce = NceSerialDriver() assert isinstance(nce.serial, Serial) + + +def test_connect(mocker): + nce = NceSerialDriver() + mocker.patch("serial.Serial.open") + nce.connect("/dev/notexist") + assert nce.serial.port == "/dev/notexist" + + +def test_connect_fail(mocker): + nce = NceSerialDriver() + mocker.patch.object(nce, "is_connected", return_value=True) + with pytest.raises(SerialException) as excinfo: + nce.connect("port") + assert "already connected" in str(excinfo.value) + + +def test_is_connected(mocker): + nce = NceSerialDriver() + nce.serial.is_open = True + mocker.patch.object(nce, "noop", return_value=True) + assert nce.is_connected() \ No newline at end of file diff --git a/bdio/brain/nceserial/errors.py b/bdio/brain/nceserial/errors.py new file mode 100644 index 0000000..2d267c7 --- /dev/null +++ b/bdio/brain/nceserial/errors.py @@ -0,0 +1,9 @@ +class NceError(Exception): + """Base class for NCE exceptions in this module""" + + pass + + +class UnsupportedCommandError(NceError): + def __init__(self, command): + self.command = command \ No newline at end of file diff --git a/bdio/brain/nceserial/util.py b/bdio/brain/nceserial/util.py new file mode 100644 index 0000000..dc53fb7 --- /dev/null +++ b/bdio/brain/nceserial/util.py @@ -0,0 +1,9 @@ + +def int2addr(number): + """converts an integer to addr_hi and addr_lo bytes""" + return number >> 8, number & 255 + +def addr2int(addr): + """converts addr_hi and addr_lo into single number address""" + hi, lo = addr + return hi << 8 | lo & 255 \ No newline at end of file