Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 62 additions & 26 deletions circuitpython_kernel/board.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
# -*- coding: utf-8 -*-
"""Serial Connection to a Board"""
import logging
import time
from serial import Serial
from serial.tools.list_ports import comports
from serial.serialutil import SerialException

# Create BOARD_LOGGER for debug messages.
BOARD_LOGGER = logging.getLogger(__name__)

# Timeouts
ENTER_REPL_TIMEOUT = 5.0
REBOOT_WAIT = 0.5

# Vendor IDs
ADAFRUIT_VID = 0x239A # SAMD
ESP8266_VID = 0x10C4 # Huzzah ESP8266
PICO_VID = 0x239A # PICO PI
TEENSY_VID = 0x16C0 #PJRC Teensy 4.1
ADAFRUIT_VID = 0x239A # SAMD
ESP8266_VID = 0x10C4 # Huzzah ESP8266
PICO_VID = 0x239A # PICO PI
TEENSY_VID = 0x16C0 # PJRC Teensy 4.1

# repl commands
CHAR_CTRL_A = b'\x01'
Expand All @@ -24,9 +28,12 @@
# repl messages
MSG_NEWLINE = b"\r\n"
MSG_RAWREPL = b"raw REPL; CTRL-B to exit"
MSG_RAWREPL_PROMPT = b"\r\n>"
MSG_RAWREPL_BARE_PROMPT = b">"
MSG_RAWREPL_PROMPT = MSG_NEWLINE + MSG_RAWREPL_BARE_PROMPT
MSG_SOFT_REBOOT = b'soft reboot\r\n'
MSG_RELOAD = b'Use CTRL-D to reload.'
MSG_AUTORELOAD = b'Auto-reload is on.'


class BoardError(Exception):
"""Errors relating to board connections"""
Expand All @@ -51,7 +58,6 @@ def write(self, msg):
self.connected = False
raise BoardError(f"cannot write to board: {serial_error}")


def read_until(self, msg):
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could remove this, or make it into read_until(self, *msgs)

"""Reads board until end of `msg`.
"""
Expand All @@ -61,6 +67,21 @@ def read_until(self, msg):
self.connected = False
raise BoardError(f"cannot read from board: {serial_error}")

def read_until_any(self, *msgs):
"""Reads board until the tokens `msgs`.
"""
content = bytearray()
while True:
try:
got = self.serial.read(1)
except SerialException as serial_error:
self.connected = False
raise BoardError(f"cannot read from board: {serial_error}")

content.extend(got)
for msg in msgs:
if content.endswith(msg):
return content

def read_all(self):
"""Attempts to read all incoming msgs from board.
Expand All @@ -85,29 +106,43 @@ def softreset(self):
"""Resets the circuitpython board (^D)
from a jupyter cell.
"""
serial = self.serial
# in case the VM is in a weird state ...
self.enter_raw_repl()
# now do the soft reset ...
BOARD_LOGGER.debug("* ^D, soft reset")
serial.write(CHAR_CTRL_D)
serial.read_until(MSG_SOFT_REBOOT)
serial.read_until(MSG_RELOAD)
serial.write(b'\n')
serial.read_until(MSG_RAWREPL)
serial.read_until(MSG_RAWREPL_PROMPT)
self.write(CHAR_CTRL_D)
self.read_until(MSG_SOFT_REBOOT)
BOARD_LOGGER.debug("* waiting for fresh boot ...")
self.read_until_any(MSG_RELOAD, MSG_AUTORELOAD)
BOARD_LOGGER.debug("* saw boot message, sleeping before interrupt ...")
time.sleep(REBOOT_WAIT)
self.enter_raw_repl()
BOARD_LOGGER.debug("* soft reset complete, in raw repl")

def enter_raw_repl(self):
"""Enters the RAW circuitpython repl.
"""
BOARD_LOGGER.debug('* enter raw repl ...')
serial = self.serial
serial.write(CHAR_CTRL_C)
serial.write(CHAR_CTRL_A)
# wait for prompt
serial.read_until(MSG_RAWREPL)
serial.read_until(MSG_RAWREPL_PROMPT)
self.write(CHAR_CTRL_C)
self.write(CHAR_CTRL_A)
BOARD_LOGGER.debug('* waiting for prompt ...')
# just waiting for the ">" prompt, but sometimes there may be other
# text before it, which includes a ">", so read_until isn't correct.
start = time.monotonic()
while True:
msg = self.read_all()

if not msg:
if time.monotonic() - start > ENTER_REPL_TIMEOUT:
raise BoardError("no response from board")
time.sleep(0.01)
continue

BOARD_LOGGER.debug(f"got: {msg}")

if msg.endswith(MSG_RAWREPL_BARE_PROMPT):
break

BOARD_LOGGER.debug('* entered raw repl, returning to kernel...')

def connect(self):
Expand All @@ -120,8 +155,8 @@ def connect(self):
try:
BOARD_LOGGER.debug(f'connect: open {device}')
self.serial = Serial(device, 115200, parity='N')
except:
raise BoardError(f"failed to access {device}")
except Exception as e:
raise BoardError(f"failed to access {device}: {e}")
# open the port
if not self.serial.is_open:
try:
Expand All @@ -137,17 +172,18 @@ def connect(self):
try:
self.enter_raw_repl()
self.connected = True
except:
raise BoardError(f"failed to enter raw repl with {device}")

except Exception as e:
raise BoardError(f"failed to enter raw repl with {device}: {e}")

def _find_board(self):
"""Find serial port where an Adafruit board is connected"""
for port in comports():
# print out each device
BOARD_LOGGER.debug(port.device)
if port.vid == ADAFRUIT_VID or port.vid == ESP8266_VID or port.vid == PICO_VID or port.vid == TEENSY_VID:
BOARD_LOGGER.debug(f"CircuitPython Board Found at: {port.device}")
if port.vid == ADAFRUIT_VID or port.vid == ESP8266_VID or \
port.vid == PICO_VID or port.vid == TEENSY_VID:
BOARD_LOGGER.debug(
f"CircuitPython Board Found at: {port.device}")
BOARD_LOGGER.debug(f"Connected? {self.connected}")
return port.device
raise BoardError("found no board")