Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pytest = "*"
coverage = "*"
pytest-cov = "*"
pyclean = "*"
rope = "*"
pytest-mock = "*"

[requires]
python_version = "3.7"
Expand Down
17 changes: 16 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions bdio/brain/brain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
from nceserial.driver import NceSerialDriver
from nceserial.commander import NceCommander
from time import sleep

"""RITMRC Layout Control and Signaling: Brain"""

if __name__ == "__main__": # is this required for an app entrypoint?
print("All aboard!") # Hello, world!
driver = NceSerialDriver()
driver.connect("/dev/ttyUSB0")
cmd_station = NceCommander(driver)
if cmd_station.is_online():
print("NCE Software version: " + cmd_station.version_formatted())
print("Time: " + cmd_station.clock_formatted())
while True:
print(cmd_station.aiu_status(4))
sleep(1)
else:
print("not connected")

Empty file.
54 changes: 54 additions & 0 deletions bdio/brain/nceserial/commander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import struct
from .commands import COMMANDS, RESPONSE_VALUES
from .util import int2addr

class NceCommander:
def __init__(self, driver):
self.driver = driver

def is_online(self):
return self.driver.is_connected() and self.noop()

def run_command(self, name):
try:
cmd, _, res_size, res_opts = COMMANDS[name]
except KeyError:
raise Exception(f"'{name}' is not a valid NCE command")

result = self.driver.read_write(cmd, res_size)
if len(res_opts):
if result not in res_opts:
# TODO: map this better from error response code to
# an error class
raise Exception(f"response not in possible result set")
return result

def clock(self):
"""Read command station clock setting"""
return self.run_command("CLOCK_READ")

def clock_formatted(self):
hour, minute = self.clock()
return f"{hour}:{minute}"

def version(self):
return self.run_command("VERSION")

def version_formatted(self):
VV, MM, mm = self.version()
return f"v{VV}.{MM}.{mm}"

def noop(self):
return self.run_command("NOOP")

def dummy(self):
return self.run_command("DUMMY")

def aiu_status(self, address):
cmd = struct.pack(">BB", 0x8A, address)
result = self.driver.read_write(cmd, 4)
#return [byte >> i & 1 for i in range(8) for byte in result]
return (
[int.from_bytes(result[0:2], byteorder='big') >> i & 1 for i in range(16)],
[int.from_bytes(result[2:4], byteorder='big') >> i & 1 for i in range(16)]
)
30 changes: 30 additions & 0 deletions bdio/brain/nceserial/commands.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
command,name,arg_size_bytes,res_size_bytes,res_options
80,noop ,0,1,!
81,assign_loco ,3,1,!|1|2
82,clock_read ,0,2,
83,clock_stop ,0,1,!
84,clock_start ,0,1,!
85,clock_set ,2,1,!|3
86,clock_swap_12_24 ,0,1,!|3
87,clock_set_ratio ,1,1,!|3
88,loco_dequeue_packet ,2,1,!|1|2
89,use_main_track ,0,1,!
8A,aiu_status ,1,4,
8B,use_prog_track ,0,1,!
8C,dummy ,0,3,!
8D,loco_set_speed ,3,1,!|1|3|14|28|128
8E,write_16 ,19,1,!|4
8F,read_16 ,2,16,
#
97,write_16 ,3,1,!
98,write_2 ,4,1,!
99,write_4 ,6,1,!
9A,write_8 ,10,1,!
9B,aiu_status_short ,1,2,
9C,run_macro ,1,1,!|0|3
9D,read_1 ,1,1,
9E,enter_prog_track_mode,0,1,!|3
9F,exit_prog_track_mode ,0,1,!
#
AA,version ,0,3,
AB,soft_reset ,0,0,
43 changes: 43 additions & 0 deletions bdio/brain/nceserial/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import csv
import os
from pprint import pprint

COMMANDS = {}
RESPONSES = {
"SUCCESS": b"!",
"UNSUPPORTED_COMMAND": b"0",
"ADDRESS_RANGE_ERROR": b"1",
"CAB_OP_RANGE_ERROR": b"2",
"DATA_RANGE_ERROR": b"3",
"BYTE_COUNT_RANGE_ERROR": b"4",
"SPEED_MODE_1": b"14",
"SPEED_MODE_2": b"28",
"SPEED_MODE_3": b"128",
}
RESPONSE_VALUES = {v: k for k, v in RESPONSES.items()}

this_dir, this_filename = os.path.split(__file__)
commands_file = os.path.join(this_dir, "commands.csv")


def unpack_opts(field):
if not field:
return []
options = field.split("|")
return list(map(lambda o: bytes(o, "ascii"), options))


with open(commands_file) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
if row[0] == "#":
continue
if row[1] == "name":
continue
cmd = bytes.fromhex(row[0])
name = str.upper(str.rstrip(row[1]))
arg_size = int(row[2])
res_size = int(row[3])
res_opts = unpack_opts(row[4])

COMMANDS[name] = (cmd, arg_size, res_size, res_opts)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
TIMEOUT_SEC = 1


class NceSerial:
class NceSerialDriver:
"""NCE Command Station Serial Interface Driver"""

def __init__(self):
Expand All @@ -19,7 +19,7 @@ def __init__(self):
)

def is_connected(self):
return self.serial.is_open()
return self.serial.is_open

def connect(self, port):
if self.is_connected():
Expand All @@ -33,5 +33,16 @@ def disconnect(self):
raise serial.SerialException("NCE Serial not connected")
self.serial.close()

def write(self, bytes):
return self
def write(self, data):
#print(f"driver: writing - {data}")
return self.serial.write(data)

def read(self, size):
result = self.serial.read(size)

#print(f"driver: read {size} bytes - {result.hex()}")
return result

def read_write(self, data, size):
self.write(data)
return self.read(size)
30 changes: 30 additions & 0 deletions bdio/brain/nceserial/driver_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest
from serial import Serial, SerialException
from .driver import NceSerialDriver


def test_construction():
nce = NceSerialDriver()
assert isinstance(nce.serial, Serial)


def test_connect(mocker):
nce = NceSerialDriver()
mocker.patch("serial.Serial.open")
nce.connect("/dev/notexist")
assert nce.serial.port == "/dev/notexist"


def test_connect_fail(mocker):
nce = NceSerialDriver()
mocker.patch.object(nce, "is_connected", return_value=True)
with pytest.raises(SerialException) as excinfo:
nce.connect("port")
assert "already connected" in str(excinfo.value)


def test_is_connected(mocker):
nce = NceSerialDriver()
nce.serial.is_open = True
mocker.patch.object(nce, "noop", return_value=True)
assert nce.is_connected()
9 changes: 9 additions & 0 deletions bdio/brain/nceserial/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class NceError(Exception):
"""Base class for NCE exceptions in this module"""

pass


class UnsupportedCommandError(NceError):
def __init__(self, command):
self.command = command
7 changes: 0 additions & 7 deletions bdio/brain/nceserial/nceserial_test.py

This file was deleted.

9 changes: 9 additions & 0 deletions bdio/brain/nceserial/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

def int2addr(number):
"""converts an integer to addr_hi and addr_lo bytes"""
return number >> 8, number & 255

def addr2int(addr):
"""converts addr_hi and addr_lo into single number address"""
hi, lo = addr
return hi << 8 | lo & 255