diff --git a/.gitignore b/.gitignore index 6e29362..409e4bc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ micropython/ target/ src/settings.py +#cached templated file +src/templates/*.py + __pycache__ temp/ #OS garbage diff --git a/src/gunpla/base_gundam.py b/src/gunpla/base_gundam.py index b07f5d8..648ba9d 100644 --- a/src/gunpla/base_gundam.py +++ b/src/gunpla/base_gundam.py @@ -10,7 +10,9 @@ class BaseGundam: Base Gunpla. """ - def __init__(self): + def __init__(self, hardware): + from src.hardware.Hardware import Hardware + self.hardware: Hardware = hardware with open(self.get_config_file()) as config_contents: self.config: json = json.loads(config_contents.read()) @@ -37,9 +39,6 @@ def led_off(self, led_name: str) -> None: led = self._get_led_from_name(led_name) led.off() - # TODO: make this not need the safe_execution and do it when we register paths - - @safe_execution def all_on(self) -> None: """ Turns all configured LED's on. @@ -62,8 +61,6 @@ def _all_leds_on(self) -> str: leds += f"{led_name}: on\n" return leds - # TODO: make this not need the safe_execution and do it when we register paths - @safe_execution def all_off(self) -> None: """ Turns all configured LED's off @@ -110,7 +107,7 @@ def _get_led_from_name(self, led_name: str) -> LED: if 'disabled' in entry and entry['disabled']: print(f"{led_name} is disabled") return DisabledLED(led_name) - return LED(entry['pin'], led_name) + return self.hardware.create_led(entry['pin'], led_name) def __get_entry_from_name(self, led_name: str) -> json: """ diff --git a/src/hardware/Hardware.py b/src/hardware/Hardware.py index 42a62c6..5ea98a1 100644 --- a/src/hardware/Hardware.py +++ b/src/hardware/Hardware.py @@ -22,3 +22,12 @@ def networking(self) -> Networking: def board_led(self) -> BoardLED: raise NotImplementedError + + def create_led(self, pin_number: int, name: str): + """ + Creates an LED instance appropriate for this hardware. + :param pin_number: GPIO pin number + :param name: LED name + :return: LED instance (LED or MockLED) + """ + raise NotImplementedError diff --git a/src/hardware/PicoHardwre.py b/src/hardware/PicoHardwre.py index 7b5a45e..12849b4 100644 --- a/src/hardware/PicoHardwre.py +++ b/src/hardware/PicoHardwre.py @@ -35,3 +35,9 @@ def get_pwm(self, pin_obj): def reset_pin(self, pin_num): """Re-initializes the pin to clear PWM settings""" return self.get_pin(pin_num, mode="OUT") + + def create_led(self, pin_number: int, name: str): + """Creates a real LED with actual GPIO pin""" + from src.pi.LED import LED + pin = self.get_pin(pin_number, mode="OUT") + return LED(pin, name) diff --git a/src/hardware/VirtualHardware.py b/src/hardware/VirtualHardware.py index ca8c031..584ea2c 100644 --- a/src/hardware/VirtualHardware.py +++ b/src/hardware/VirtualHardware.py @@ -81,3 +81,9 @@ def networking(self) -> Networking: def reset_pin(self, pin_num): print(f"[SIM] Pin {pin_num} reset to standard GPIO") + + def create_led(self, pin_number: int, name: str): + """Creates a mock LED for simulation""" + from src.pi.LED import MockLED + pin = self.get_pin(pin_number, mode="OUT") + return MockLED(pin, name) diff --git a/src/pi/LED.py b/src/pi/LED.py index 54a750a..4ba1bdc 100644 --- a/src/pi/LED.py +++ b/src/pi/LED.py @@ -5,10 +5,12 @@ class LED: A wrapper around Pin(n, Pin.OUT) that provides LED light like functionality to turn on, turn off, dim, etc """ - def __init__(self, pin_number: int, name: str): - from machine import Pin - - self._pin: Pin = Pin(pin_number, Pin.OUT) + def __init__(self, pin, name: str): + """ + :param pin: A Pin object (from machine.Pin or MockPin) + :param name: The LED name + """ + self._pin = pin self._led_name = name def enabled(self) -> bool: @@ -40,3 +42,20 @@ def pin(self): :return: The underlying Raspberry Pi Pico Pin of the LED. """ return self._pin + + +class MockLED(LED): + """ + LED implementation for simulation that prints actions to console. + Used when running with VirtualHardware for testing without physical hardware. + """ + + def on(self): + """Turns on the LED with simulation output""" + print(f"[SIM] LED '{self._led_name}' (Pin {self._pin.num}) ON") + self._pin.on() + + def off(self): + """Turns off the LED with simulation output""" + print(f"[SIM] LED '{self._led_name}' (Pin {self._pin.num}) OFF") + self._pin.off() diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py index 6261f45..ff1f38e 100644 --- a/src/server/Wrappers.py +++ b/src/server/Wrappers.py @@ -10,7 +10,10 @@ def safe_execution(func): async def wrapper(*args, **kwargs): try: print(f"Trying to execute {func.__name__}") - await func(*args, **kwargs) + result = await func(*args, **kwargs) + # If the function returns a value, use it; otherwise return success JSON + if result is not None: + return result return {"status": "success", "action": func.__name__}, 202 except Exception as e: # Log the error to console diff --git a/src/server/microdot/Microdot.py b/src/server/microdot/Microdot.py index 04da2ec..24f1b98 100644 --- a/src/server/microdot/Microdot.py +++ b/src/server/microdot/Microdot.py @@ -491,7 +491,7 @@ def form(self): if self.content_type is None: return None mime_type = self.content_type.split(';')[0] - if mime_type != 'application/x-www-form-urlencoded': + if mime_type != 'application/x-templates-form-urlencoded': return None self._form = self._parse_urlencoded(self.body) return self._form diff --git a/src/server/microdot/utemplate.py b/src/server/microdot/utemplate.py new file mode 100644 index 0000000..bf30efb --- /dev/null +++ b/src/server/microdot/utemplate.py @@ -0,0 +1,70 @@ +from src.server.utemplate import recompile + +_loader = None + + +class Template: + """A template object. + + :param template: The filename of the template to render, relative to the + configured template directory. + """ + @classmethod + def initialize(cls, template_dir='templates', + loader_class=recompile.Loader): + """Initialize the templating subsystem. + + :param template_dir: the directory where templates are stored. This + argument is optional. The default is to load + templates from a *templates* subdirectory. + :param loader_class: the ``utemplate.Loader`` class to use when loading + templates. This argument is optional. The default + is the ``recompile.Loader`` class, which + automatically recompiles templates when they + change. + """ + global _loader + _loader = loader_class(None, template_dir) + + def __init__(self, template): + if _loader is None: # pragma: no cover + self.initialize() + #: The name of the template + self.name = template + self.template = _loader.load(template) + + def generate(self, *args, **kwargs): + """Return a generator that renders the template in chunks, with the + given arguments.""" + return self.template(*args, **kwargs) + + def render(self, *args, **kwargs): + """Render the template with the given arguments and return it as a + string.""" + return ''.join(self.generate(*args, **kwargs)) + + def generate_async(self, *args, **kwargs): + """Return an asynchronous generator that renders the template in + chunks, using the given arguments.""" + class sync_to_async_iter(): + def __init__(self, iter): + self.iter = iter + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.iter) + except StopIteration: + raise StopAsyncIteration + + return sync_to_async_iter(self.generate(*args, **kwargs)) + + async def render_async(self, *args, **kwargs): + """Render the template with the given arguments asynchronously and + return it as a string.""" + response = '' + async for chunk in self.generate_async(*args, **kwargs): + response += chunk + return response diff --git a/src/server/utemplate/compiled.py b/src/server/utemplate/compiled.py new file mode 100644 index 0000000..82237a4 --- /dev/null +++ b/src/server/utemplate/compiled.py @@ -0,0 +1,14 @@ +class Loader: + + def __init__(self, pkg, dir): + if dir == ".": + dir = "" + else: + dir = dir.replace("/", ".") + "." + if pkg and pkg != "__main__": + dir = pkg + "." + dir + self.p = dir + + def load(self, name): + name = name.replace(".", "_") + return __import__(self.p + name, None, None, (name,)).render diff --git a/src/server/utemplate/recompile.py b/src/server/utemplate/recompile.py new file mode 100644 index 0000000..6c2bcdf --- /dev/null +++ b/src/server/utemplate/recompile.py @@ -0,0 +1,22 @@ +# (c) 2014-2020 Paul Sokolovsky. MIT license. +try: + from uos import remove, stat +except: + from os import stat, remove + +from . import source + + +class Loader(source.Loader): + + def load(self, name): + o_path = self.pkg_path + self.compiled_path(name) + i_path = self.pkg_path + self.dir + "/" + name + try: + o_stat = stat(o_path) + i_stat = stat(i_path) + if i_stat[8] > o_stat[8]: + # input file is newer, remove output to force recompile + remove(o_path) + finally: + return super().load(name) diff --git a/src/server/utemplate/source.py b/src/server/utemplate/source.py new file mode 100644 index 0000000..39ae122 --- /dev/null +++ b/src/server/utemplate/source.py @@ -0,0 +1,188 @@ +# (c) 2014-2019 Paul Sokolovsky. MIT license. +from . import compiled + + +class Compiler: + + START_CHAR = "{" + STMNT = "%" + STMNT_END = "%}" + EXPR = "{" + EXPR_END = "}}" + + def __init__(self, file_in, file_out, indent=0, seq=0, loader=None): + self.file_in = file_in + self.file_out = file_out + self.loader = loader + self.seq = seq + self._indent = indent + self.stack = [] + self.in_literal = False + self.flushed_header = False + self.args = "*a, **d" + + def indent(self, adjust=0): + if not self.flushed_header: + self.flushed_header = True + self.indent() + self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args)) + self.stack.append("def") + self.file_out.write(" " * (len(self.stack) + self._indent + adjust)) + + def literal(self, s): + if not s: + return + if not self.in_literal: + self.indent() + self.file_out.write('yield """') + self.in_literal = True + self.file_out.write(s.replace('"', '\\"')) + + def close_literal(self): + if self.in_literal: + self.file_out.write('"""\n') + self.in_literal = False + + def render_expr(self, e): + self.indent() + self.file_out.write('yield str(' + e + ')\n') + + def parse_statement(self, stmt): + tokens = stmt.split(None, 1) + if tokens[0] == "args": + if len(tokens) > 1: + self.args = tokens[1] + else: + self.args = "" + elif tokens[0] == "set": + self.indent() + self.file_out.write(stmt[3:].strip() + "\n") + elif tokens[0] == "include": + if not self.flushed_header: + # If there was no other output, we still need a header now + self.indent() + tokens = tokens[1].split(None, 1) + args = "" + if len(tokens) > 1: + args = tokens[1] + if tokens[0][0] == "{": + self.indent() + # "1" as fromlist param is uPy hack + self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2]) + self.indent() + self.file_out.write("yield from _.render(%s)\n" % args) + return + + with self.loader.input_open(tokens[0][1:-1]) as inc: + self.seq += 1 + c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq) + inc_id = self.seq + self.seq = c.compile() + self.indent() + self.file_out.write("yield from render%d(%s)\n" % (inc_id, args)) + elif len(tokens) > 1: + if tokens[0] == "elif": + assert self.stack[-1] == "if" + self.indent(-1) + self.file_out.write(stmt + ":\n") + else: + self.indent() + self.file_out.write(stmt + ":\n") + self.stack.append(tokens[0]) + else: + if stmt.startswith("end"): + assert self.stack[-1] == stmt[3:] + self.stack.pop(-1) + elif stmt == "else": + assert self.stack[-1] == "if" + self.indent(-1) + self.file_out.write("else:\n") + else: + assert False + + def parse_line(self, l): + while l: + start = l.find(self.START_CHAR) + if start == -1: + self.literal(l) + return + self.literal(l[:start]) + self.close_literal() + sel = l[start + 1] + # print("*%s=%s=" % (sel, EXPR)) + if sel == self.STMNT: + end = l.find(self.STMNT_END) + assert end > 0 + stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip() + self.parse_statement(stmt) + end += len(self.STMNT_END) + l = l[end:] + if not self.in_literal and l == "\n": + break + elif sel == self.EXPR: + # print("EXPR") + end = l.find(self.EXPR_END) + assert end > 0 + expr = l[start + len(self.START_CHAR + self.EXPR):end].strip() + self.render_expr(expr) + end += len(self.EXPR_END) + l = l[end:] + else: + self.literal(l[start]) + l = l[start + 1:] + + def header(self): + self.file_out.write("# Autogenerated file\n") + + def compile(self): + self.header() + for l in self.file_in: + self.parse_line(l) + self.close_literal() + return self.seq + + +class Loader(compiled.Loader): + + def __init__(self, pkg, dir): + super().__init__(pkg, dir) + self.dir = dir + if pkg == "__main__": + # if pkg isn't really a package, don't bother to use it + # it means we're running from "filesystem directory", not + # from a package. + pkg = None + + self.pkg_path = "" + if pkg: + p = __import__(pkg) + if isinstance(p.__path__, str): + # uPy + self.pkg_path = p.__path__ + else: + # CPy + self.pkg_path = p.__path__[0] + self.pkg_path += "/" + + def input_open(self, template): + path = self.pkg_path + self.dir + "/" + template + return open(path) + + def compiled_path(self, template): + return self.dir + "/" + template.replace(".", "_") + ".py" + + def load(self, name): + try: + return super().load(name) + except (OSError, ImportError): + pass + + compiled_path = self.pkg_path + self.compiled_path(name) + + f_in = self.input_open(name) + f_out = open(compiled_path, "w") + c = Compiler(f_in, f_out, loader=self) + c.compile() + f_in.close() + f_out.close() + return super().load(name) diff --git a/src/server/webserver.py b/src/server/webserver.py index 025cd63..1b633bc 100644 --- a/src/server/webserver.py +++ b/src/server/webserver.py @@ -4,7 +4,8 @@ from src.gunpla.generic_gundam import GenericGundam from src.hardware.Hardware import Hardware from src.pi.led_effect import LEDEffects -from src.server.microdot.Microdot import Microdot, Request, send_file +from src.server.microdot.Microdot import Microdot, Request +from src.server.microdot.utemplate import Template from src.server.Wrappers import create_show_handler, safe_execution @@ -16,25 +17,41 @@ class WebServer: def __init__(self, configuration: dict, hardware: Hardware): self.app = Microdot() self.settings: dict = configuration - self.gundam: GenericGundam = configuration['model'] + # Instantiate the model class with hardware + self.gundam: GenericGundam = configuration['model'](hardware) self.hardware: Hardware = hardware + Template.initialize(template_dir='src/templates') @safe_execution async def index(self, request: Request): """ Returns the root index page """ - # Todo fix this rendering - return await send_file("src/www/index.html") + led_list = [{"name": led.name()} for led in self.gundam.get_all_leds()] + show_list = self.gundam.config['lightshow'] + + return await Template('index.html').render_async( + name_of_title="Gundam LED Control", + all_leds=led_list, + lightshows=show_list + ), 200, {'Content-Type': 'text/html'} @safe_execution async def canary(self, request: Request): """ Sanity check to make sure webserver is running. """ - asyncio.create_task(LEDEffects.blink(self.hardware.board_led)) + asyncio.create_task(LEDEffects.blink(self.hardware.board_led())) return "chirp", 202 + def all_on(self, request: Request): + self.gundam.all_on() + return "All leds are on", 202 + + def all_off(self, request: Request): + self.gundam.all_off() + return "All leds are off", 202 + async def _connect_to_wifi(self): ipaddress: str = await self.hardware.networking().connect_to_wifi(self.settings['ssid'], self.settings['password']) if ipaddress: @@ -75,8 +92,8 @@ async def led_on_handler(request, led_name): async def led_off_handler(request, led_name): return self.gundam.led_off(led_name) - self.app.route("/all/on")(self.gundam.all_on) - self.app.route("/all/off")(self.gundam.all_off) + self.app.route("/all/on")(self.all_on) + self.app.route("/all/off")(self.all_off) # dynamically add all lightshow paths for lightshow in self.gundam.config['lightshow']: @@ -87,6 +104,22 @@ async def led_off_handler(request, led_name): # 404 Handler @self.app.errorhandler(404) - def not_found(request): - # TODO: list all routes - return "Not found", 404 + async def not_found(request): + # Microdot stores routes in the url_map which is a tuple + urls: list[str] = [] + for route in self.app.url_map: + # route is a tuple: (method, path_re, handler) + path = route[1].url_pattern # The regex pattern of the URL + + if "" in path: + x = [led.name() for led in self.gundam.get_all_leds() if led.enabled() ] + for led_name in x: + complete_path = path.replace("", led_name) + urls.append(complete_path) + else: + urls.append(path) + + return await Template('404.html').render_async( + name_of_title="404", + urls=urls, + ), 404, {'Content-Type': 'text/html'} diff --git a/src/templates/404.html b/src/templates/404.html new file mode 100644 index 0000000..56fc7e1 --- /dev/null +++ b/src/templates/404.html @@ -0,0 +1,23 @@ +{% args name_of_title, urls %} + + + + {{ name_of_title }} + + + +

Page not found 404

+ +
+
+

List of available pages

+ + + + \ No newline at end of file diff --git a/src/templates/index.html b/src/templates/index.html new file mode 100644 index 0000000..daabc8c --- /dev/null +++ b/src/templates/index.html @@ -0,0 +1,45 @@ +{% args name_of_title, all_leds, lightshows %} + + + + {{ name_of_title }} + + + +

Control individual LEDs

+ + +

Light Shows

+ + +
+

Global Controls

+
+ +
+
+ +
+ + + \ No newline at end of file diff --git a/src/test.py b/src/test.py index eed0021..daa3890 100644 --- a/src/test.py +++ b/src/test.py @@ -4,8 +4,6 @@ import time -from machine import Pin - def main(): """ diff --git a/src/www/index.html b/src/www/index.html deleted file mode 100644 index ea2b65e..0000000 --- a/src/www/index.html +++ /dev/null @@ -1,35 +0,0 @@ - - -{{title}} - - -

Control individual leds

-{{ -"".join(["""
  • -
    - -
    -
    - -
    -
  • \n""".format(itm["name"]) for itm in all_leds]) -}} - -

    Light Show

    -{{ -"".join(["""
  • -
    - -
    -
  • \n""".format(itm['path'], itm["name"]) for itm in lightshows]) -}} -
    -

    Sanity

    -
    - -
    -
    - -
    - - \ No newline at end of file diff --git a/tests/LocalServerTest.py b/tests/LocalServerTest.py index 9d74f6d..476192a 100644 --- a/tests/LocalServerTest.py +++ b/tests/LocalServerTest.py @@ -16,13 +16,13 @@ class MobileDoll(GenericGundam): Mobile Doll """ - def __init__(self, model_config: json): - self.config = model_config + def __init__(self, hardware, model_config: json = None): + self.hardware = hardware + self.config = model_config if model_config else {} def get_config_file(self) -> str: return "tests/config/virgo.json" - # TODO: refactor light show url creation to be a decorator and also not need the request. async def activation(self): print("Mobile Doll activation") return @@ -35,7 +35,7 @@ def main(): "name": "Virgo", "leds": [ - {"name": "head", "pin": 0, "color": "green", "disabled": True}, + {"name": "head", "pin": 0, "color": "green", "disabled": False}, ], "lightshow": [ { @@ -50,7 +50,7 @@ def main(): "ssid": "wifi", "password": 'wifi-pass', "hostname": 'virgo', - "model": MobileDoll(model_config) + "model": lambda hardware: MobileDoll(hardware, model_config) } webserver = WebServer(test_settings, VirtualHardware())