diff --git a/Pipfile b/Pipfile index a107174..d3b14ba 100644 --- a/Pipfile +++ b/Pipfile @@ -14,6 +14,8 @@ pytest = "*" coverage = "*" pytest-cov = "*" pyclean = "*" +rope = "*" +pytest-mock = "*" [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index f803e7d..3772b36 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e6545aa4fa1331a0fb84597868ae375627bd33f64474d29dc918eebc98e4a377" + "sha256": "1065c248d3077683811f618e3c38e722c899ed3b612e3d0ea099e9caa07813ba" }, "pipfile-spec": 6, "requires": { @@ -239,6 +239,14 @@ "index": "pypi", "version": "==2.11.1" }, + "pytest-mock": { + "hashes": [ + "sha256:379b391cfad22422ea2e252bdfc008edd08509029bcde3c25b2c0bd741e0424e", + "sha256:a1e2aba6af9560d313c642dae7e00a2a12b022b80301d9d7fc8ec6858e1dd9fc" + ], + "index": "pypi", + "version": "==3.5.1" + }, "regex": { "hashes": [ "sha256:01afaf2ec48e196ba91b37451aa353cb7eda77efe518e481707e0515025f0cd5", @@ -285,6 +293,13 @@ ], "version": "==2021.4.4" }, + "rope": { + "hashes": [ + "sha256:786b5c38c530d4846aa68a42604f61b4e69a493390e3ca11b88df0fbfdc3ed04" + ], + "index": "pypi", + "version": "==0.18.0" + }, "toml": { "hashes": [ "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", diff --git a/bdio/brain/brain.py b/bdio/brain/brain.py index c9aed7c..0a0baeb 100644 --- a/bdio/brain/brain.py +++ b/bdio/brain/brain.py @@ -1,4 +1,20 @@ +from nceserial.driver import NceSerialDriver +from nceserial.commander import NceCommander +from time import sleep + """RITMRC Layout Control and Signaling: Brain""" if __name__ == "__main__": # is this required for an app entrypoint? print("All aboard!") # Hello, world! + driver = NceSerialDriver() + driver.connect("/dev/ttyUSB0") + cmd_station = NceCommander(driver) + if cmd_station.is_online(): + print("NCE Software version: " + cmd_station.version_formatted()) + print("Time: " + cmd_station.clock_formatted()) + while True: + print(cmd_station.aiu_status(4)) + sleep(1) + else: + print("not connected") + \ No newline at end of file diff --git a/bdio/brain/nceserial/__init__.py b/bdio/brain/nceserial/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bdio/brain/nceserial/commander.py b/bdio/brain/nceserial/commander.py new file mode 100644 index 0000000..05fbca9 --- /dev/null +++ b/bdio/brain/nceserial/commander.py @@ -0,0 +1,54 @@ +import struct +from .commands import COMMANDS, RESPONSE_VALUES +from .util import int2addr + +class NceCommander: + def __init__(self, driver): + self.driver = driver + + def is_online(self): + return self.driver.is_connected() and self.noop() + + def run_command(self, name): + try: + cmd, _, res_size, res_opts = COMMANDS[name] + except KeyError: + raise Exception(f"'{name}' is not a valid NCE command") + + result = self.driver.read_write(cmd, res_size) + if len(res_opts): + if result not in res_opts: + # TODO: map this better from error response code to + # an error class + raise Exception(f"response not in possible result set") + return result + + def clock(self): + """Read command station clock setting""" + return self.run_command("CLOCK_READ") + + def clock_formatted(self): + hour, minute = self.clock() + return f"{hour}:{minute}" + + def version(self): + return self.run_command("VERSION") + + def version_formatted(self): + VV, MM, mm = self.version() + return f"v{VV}.{MM}.{mm}" + + def noop(self): + return self.run_command("NOOP") + + def dummy(self): + return self.run_command("DUMMY") + + def aiu_status(self, address): + cmd = struct.pack(">BB", 0x8A, address) + result = self.driver.read_write(cmd, 4) + #return [byte >> i & 1 for i in range(8) for byte in result] + return ( + [int.from_bytes(result[0:2], byteorder='big') >> i & 1 for i in range(16)], + [int.from_bytes(result[2:4], byteorder='big') >> i & 1 for i in range(16)] + ) diff --git a/bdio/brain/nceserial/commands.csv b/bdio/brain/nceserial/commands.csv new file mode 100644 index 0000000..485a5f8 --- /dev/null +++ b/bdio/brain/nceserial/commands.csv @@ -0,0 +1,30 @@ +command,name,arg_size_bytes,res_size_bytes,res_options +80,noop ,0,1,! +81,assign_loco ,3,1,!|1|2 +82,clock_read ,0,2, +83,clock_stop ,0,1,! +84,clock_start ,0,1,! +85,clock_set ,2,1,!|3 +86,clock_swap_12_24 ,0,1,!|3 +87,clock_set_ratio ,1,1,!|3 +88,loco_dequeue_packet ,2,1,!|1|2 +89,use_main_track ,0,1,! +8A,aiu_status ,1,4, +8B,use_prog_track ,0,1,! +8C,dummy ,0,3,! +8D,loco_set_speed ,3,1,!|1|3|14|28|128 +8E,write_16 ,19,1,!|4 +8F,read_16 ,2,16, +# +97,write_16 ,3,1,! +98,write_2 ,4,1,! +99,write_4 ,6,1,! +9A,write_8 ,10,1,! +9B,aiu_status_short ,1,2, +9C,run_macro ,1,1,!|0|3 +9D,read_1 ,1,1, +9E,enter_prog_track_mode,0,1,!|3 +9F,exit_prog_track_mode ,0,1,! +# +AA,version ,0,3, +AB,soft_reset ,0,0, \ No newline at end of file diff --git a/bdio/brain/nceserial/commands.py b/bdio/brain/nceserial/commands.py new file mode 100644 index 0000000..a751765 --- /dev/null +++ b/bdio/brain/nceserial/commands.py @@ -0,0 +1,43 @@ +import csv +import os +from pprint import pprint + +COMMANDS = {} +RESPONSES = { + "SUCCESS": b"!", + "UNSUPPORTED_COMMAND": b"0", + "ADDRESS_RANGE_ERROR": b"1", + "CAB_OP_RANGE_ERROR": b"2", + "DATA_RANGE_ERROR": b"3", + "BYTE_COUNT_RANGE_ERROR": b"4", + "SPEED_MODE_1": b"14", + "SPEED_MODE_2": b"28", + "SPEED_MODE_3": b"128", +} +RESPONSE_VALUES = {v: k for k, v in RESPONSES.items()} + +this_dir, this_filename = os.path.split(__file__) +commands_file = os.path.join(this_dir, "commands.csv") + + +def unpack_opts(field): + if not field: + return [] + options = field.split("|") + return list(map(lambda o: bytes(o, "ascii"), options)) + + +with open(commands_file) as csvfile: + reader = csv.reader(csvfile) + for row in reader: + if row[0] == "#": + continue + if row[1] == "name": + continue + cmd = bytes.fromhex(row[0]) + name = str.upper(str.rstrip(row[1])) + arg_size = int(row[2]) + res_size = int(row[3]) + res_opts = unpack_opts(row[4]) + + COMMANDS[name] = (cmd, arg_size, res_size, res_opts) \ No newline at end of file diff --git a/bdio/brain/nceserial/nceserial.py b/bdio/brain/nceserial/driver.py similarity index 66% rename from bdio/brain/nceserial/nceserial.py rename to bdio/brain/nceserial/driver.py index a919c84..845cabc 100644 --- a/bdio/brain/nceserial/nceserial.py +++ b/bdio/brain/nceserial/driver.py @@ -7,7 +7,7 @@ TIMEOUT_SEC = 1 -class NceSerial: +class NceSerialDriver: """NCE Command Station Serial Interface Driver""" def __init__(self): @@ -19,7 +19,7 @@ def __init__(self): ) def is_connected(self): - return self.serial.is_open() + return self.serial.is_open def connect(self, port): if self.is_connected(): @@ -33,5 +33,16 @@ def disconnect(self): raise serial.SerialException("NCE Serial not connected") self.serial.close() - def write(self, bytes): - return self + def write(self, data): + #print(f"driver: writing - {data}") + return self.serial.write(data) + + def read(self, size): + result = self.serial.read(size) + + #print(f"driver: read {size} bytes - {result.hex()}") + return result + + def read_write(self, data, size): + self.write(data) + return self.read(size) \ No newline at end of file diff --git a/bdio/brain/nceserial/driver_test.py b/bdio/brain/nceserial/driver_test.py new file mode 100644 index 0000000..ef26b58 --- /dev/null +++ b/bdio/brain/nceserial/driver_test.py @@ -0,0 +1,30 @@ +import pytest +from serial import Serial, SerialException +from .driver import NceSerialDriver + + +def test_construction(): + nce = NceSerialDriver() + assert isinstance(nce.serial, Serial) + + +def test_connect(mocker): + nce = NceSerialDriver() + mocker.patch("serial.Serial.open") + nce.connect("/dev/notexist") + assert nce.serial.port == "/dev/notexist" + + +def test_connect_fail(mocker): + nce = NceSerialDriver() + mocker.patch.object(nce, "is_connected", return_value=True) + with pytest.raises(SerialException) as excinfo: + nce.connect("port") + assert "already connected" in str(excinfo.value) + + +def test_is_connected(mocker): + nce = NceSerialDriver() + nce.serial.is_open = True + mocker.patch.object(nce, "noop", return_value=True) + assert nce.is_connected() \ No newline at end of file diff --git a/bdio/brain/nceserial/errors.py b/bdio/brain/nceserial/errors.py new file mode 100644 index 0000000..2d267c7 --- /dev/null +++ b/bdio/brain/nceserial/errors.py @@ -0,0 +1,9 @@ +class NceError(Exception): + """Base class for NCE exceptions in this module""" + + pass + + +class UnsupportedCommandError(NceError): + def __init__(self, command): + self.command = command \ No newline at end of file diff --git a/bdio/brain/nceserial/nceserial_test.py b/bdio/brain/nceserial/nceserial_test.py deleted file mode 100644 index c40721d..0000000 --- a/bdio/brain/nceserial/nceserial_test.py +++ /dev/null @@ -1,7 +0,0 @@ -from serial import Serial -from nceserial import NceSerial - - -def test_NceSerial_construction(): - nce = NceSerial() - assert isinstance(nce.serial, Serial) diff --git a/bdio/brain/nceserial/util.py b/bdio/brain/nceserial/util.py new file mode 100644 index 0000000..dc53fb7 --- /dev/null +++ b/bdio/brain/nceserial/util.py @@ -0,0 +1,9 @@ + +def int2addr(number): + """converts an integer to addr_hi and addr_lo bytes""" + return number >> 8, number & 255 + +def addr2int(addr): + """converts addr_hi and addr_lo into single number address""" + hi, lo = addr + return hi << 8 | lo & 255 \ No newline at end of file