diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..28f4b7c --- /dev/null +++ b/.gitignore @@ -0,0 +1,134 @@ +### MANUAL ### + + +### TEMPLATES ### +## Ignoreable Python Files (via https://github.com/github/gitignore/blob/main/Python.gitignore) +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/bizhook/memory.py b/bizhook/memory.py index b19825b..2370847 100644 --- a/bizhook/memory.py +++ b/bizhook/memory.py @@ -2,10 +2,31 @@ from socket import socket as Socket from socket import create_connection -from typing import Union - from .exceptions import InvalidRequest, InvalidResponse +''' +Planned: +client: + load rom + reset? +''' +QUERY_TYPE = { + "INPUT": 0, + "READ": 1, + "WRITE": 2, + "CLIENT": 3 +} + +RESPONSE_CODES = { + "INPUT": 0, # Successfully passed input + "BYTE": 1, # Successfully read byte + "INTEGER": 2, # Successfully read integer + "FLOAT": 3, # Successfully read float + "ERROR": 4 # Generic error +} + + +DELIMITER = '/' class Memory: @@ -29,49 +50,17 @@ def __init__(self, self.port = port - def _request(self, query: Union[bytes, str]): - """Send a request to the Bizhawk Lua memory hook - - - Pattern: DOMN/ADDR/TSLE/VLUE - - DOMN - Memory domain - ADDR - Address - TSLE--.-------------------. - Type: Signage: - | * [b]yte * [u]nsigned - | * [i]nteger * [s]igned - | * [f]loat - | - .-------------------. - Size: Endianness: - * [1] byte * [l]ittle endian - * [2] bytes * [b]ig endian - * [3] bytes - * [4] bytes - - VLUE - Integer or float, ex. 12 or -1.2 - - - # EXAMPLES - - VRAM/11423051/iu4b/23 - Write (23) to (11423051) in (VRAM) - an [i]nteger, [u]nsigned, - [4] bytes long and [b]ig endian - - WRAM/21512962/fs2l/ - Read from (21512962) in (WRAM) - a [f]loat, [s]igned, - [2] bytes long and [l]ittle endian - """ + def _request(self, query): + # print("_request") # Socket requires a byte string to send if type(query) is not bytes: query = query.encode() + # print("_request: query:", query.decode('ascii')) with create_connection((self.address, self.port)) as socket: + # print("_request: sending") # Send request and expect response socket.sendall(query) response = self._receive(socket) @@ -81,25 +70,26 @@ def _request(self, query: Union[bytes, str]): # Extract response code and message code, _, message = response.decode('UTF-8').partition('_') code = int(code) + # print(f"_request: code={code} message={message}") except ValueError: raise InvalidResponse('Response could not be divided into code and message') # Successfully wrote to memory - if code == 0: + if code == RESPONSE_CODES["INPUT"]: return True # Successfully read byte - if code == 1: + if code == RESPONSE_CODES["BYTE"]: return response[response.index(b'_'):] # Successfully read integer - if code == 2: + if code == RESPONSE_CODES["INTEGER"]: return int(message) # Successfully read float - if code == 3: + if code == RESPONSE_CODES["FLOAT"]: return float(message) @@ -112,179 +102,56 @@ def _receive(self, socket: Socket, n: int=128): # and concatenate them at the end buffer = [] - while True: data = socket.recv(n) if not data: + # print("_receive: no data") break buffer.append(data) return b''.join(buffer) - def _format_tsle(self, type_: type, signed: bool, size: int, endianness: str): - """Format the type, signage, size, and endianness for a request""" - - if type_ not in (bytes, int, float): - raise ValueError('Type must be bytes, int or float') + def build_query(self, query_type: int, address: int=0x00, button_name: str=None, button_state: bool=None): + ''' + QUERY FORMATS: - if size not in (1, 2, 3, 4): - raise ValueError('Size must be 1, 2, 3 or 4 bytes') + [input] + 0 / button_name / button_state / - if endianness not in ('little', 'big'): - raise ValueError('Endianness must be little or big') + [read bytes] + 1 / domain / address / + ''' + if query_type == QUERY_TYPE['INPUT']: + # 0 / button_name / button_state / + try: + query = str(query_type) + DELIMITER + button_name + DELIMITER + str(button_state) + DELIMITER + return query + except TypeError as e: + raise(f"Arguments missing from query...\n{e}") + - return ''.join([ - type_.__name__[0], - 'us'[signed], - str(size), - endianness[0] - ]) - - def _format_query(self, address: int, type_: type=bytes, signed: bool=False, - size: int=1, endianness: str='big', value: Union[int, float]=None): - """Format all request parameters into a valid query for a request""" + elif query_type == QUERY_TYPE['READ']: + # 1 / domain / address / + try: + query = str(query_type) + DELIMITER + str(self.domain) + DELIMITER + str(address) + DELIMITER + return query + except TypeError as e: + raise(f"Arguments missing from query...\n{e}") - tsle = self._format_tsle(type_, signed, size, endianness) - return f'{self.domain}/{address}/{tsle}/{"" if value is None else value}' + else: + raise("Invalid argument type") + return def read_byte(self, address: int): """Read byte from memory""" - return self._request(self._format_query( - address=address, - type_=bytes, - value=None - )) - - def write_byte(self, address: int, value: int): - """Write byte from memory""" - return self._request(self._format_query( - address=address, - type_=bytes, - value=value - )) - - def read_int(self, address: int, signed: bool=None, size: int=None, endianness: str=None): - """Read integer from memory""" - return self._request(self._format_query( - address=address, - type_=int, - signed=signed if signed is not None else self.default_signed, - size=size if size is not None else self.default_size, - endianness=endianness if endianness is not None else self.default_endianness, - value=None - )) - - def write_int(self, address: int, value: int, signed: bool=None, size: int=None, endianness: str=None): - """Write integer from memory""" - return self._request(self._format_query( - address=address, - type_=int, - signed=signed if signed is not None else self.default_signed, - size=size if size is not None else self.default_size, - endianness=endianness if endianness is not None else self.default_endianness, - value=value - )) - - def read_float(self, address: int, endianness: str=None): - """Read float from memory""" - return self._request(self._format_query( - address=address, - type_=float, - endianness=endianness if endianness is not None else self.default_endianness, - value=None - )) - - def write_float(self, address: int, value: float, endianness: str=None): - """Write float from memory""" - return self._request(self._format_query( - address=address, - type_=float, - endianness=endianness if endianness is not None else self.default_endianness, - value=value - )) - - - # |--------------------------------- - # | Non-argumentative alternatives - - def read_u8(self, address: int): - return self.read_int(address, signed=False, size=1) - - def read_u16_be(self, address: int): - return self.read_int(address, signed=False, size=2, endianness='big') - - def read_u24_be(self, address: int): - return self.read_int(address, signed=False, size=3, endianness='big') - - def read_u32_be(self, address: int): - return self.read_int(address, signed=False, size=4, endianness='big') - - def read_u16_le(self, address: int): - return self.read_int(address, signed=False, size=2, endianness='little') - - def read_u24_le(self, address: int): - return self.read_int(address, signed=False, size=3, endianness='little') - - def read_u32_le(self, address: int): - return self.read_int(address, signed=False, size=4, endianness='little') - - def read_s16_be(self, address: int): - return self.read_int(address, signed=True, size=2, endianness='big') - - def read_s24_be(self, address: int): - return self.read_int(address, signed=True, size=3, endianness='big') - - def read_s32_be(self, address: int): - return self.read_int(address, signed=True, size=4, endianness='big') - - def read_s16_le(self, address: int): - return self.read_int(address, signed=True, size=2, endianness='little') - - def read_s24_le(self, address: int): - return self.read_int(address, signed=True, size=3, endianness='little') - - def read_s32_le(self, address: int): - return self.read_int(address, signed=True, size=4, endianness='little') - - def write_u8(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=1) - - def write_u16_be(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=2, endianness='big') - - def write_u24_be(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=3, endianness='big') - - def write_u32_be(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=4, endianness='big') - - def write_u16_le(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=2, endianness='little') - - def write_u24_le(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=3, endianness='little') - - def write_u32_le(self, address: int, value: int): - return self.write_int(address, value, signed=False, size=4, endianness='little') - - def write_s16_be(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=2, endianness='big') - - def write_s24_be(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=3, endianness='big') - - def write_s32_be(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=4, endianness='big') - - def write_s16_le(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=2, endianness='little') - - def write_s24_le(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=3, endianness='little') + q = self.build_query(QUERY_TYPE["READ"], address=address) + return self._request(q) - def write_s32_le(self, address: int, value: int): - return self.write_int(address, value, signed=True, size=4, endianness='little') \ No newline at end of file + def send_input(self, key_name: str, key_state: bool): + """Pass input to emulator""" + q = self.build_query(QUERY_TYPE["INPUT"], button_name=key_name, button_state=key_state) + return self._request(q)