diff --git a/api_gen.py b/api_gen.py index f2fb937..bd91f80 100755 --- a/api_gen.py +++ b/api_gen.py @@ -23,10 +23,10 @@ class BaseEl(object): type_id = 'abstract base' def __init__(self, attrs): self.attrs = attrs - + def __repr__(self): return getattr(self, 'name', super(BaseEl, self).__repr__()) - + def __getattr__(self, attr): if self.__dict__.has_key(attr): return self.__dict__[attr] @@ -34,32 +34,32 @@ def __getattr__(self, attr): return self.attrs[attr] else: raise AttributeError("object '{0}' has no attribute '{1}'".format(self.type_id, attr)) - + class ApiEl(BaseEl): type_id = 'api' def __init__(self, attrs): super(ApiEl, self).__init__(attrs) self.classes = OrderedDict() - + def __repr__(self): return 'API object (device={0})'.format(self.device_name) - + def add_class(self, obj): obj.parent = self self.classes[obj.name] = obj - + def add_datatypes(self, obj): print('add datatypes') def generate_commands(self): lines = [c.generate_commands() for c in self.classes.values()] return sum(lines, []) - + def generate_events(self): lines = [c.generate_events() for c in self.classes.values()] return sum(lines, []) - + def generate_module_header(self): return ["""# -*- coding: utf-8 -*- @@ -69,7 +69,7 @@ def generate_module_header(self): import numpy as np import ctypes -uart = ctypes.CDLL('./uart.so') +uart = ctypes.CDLL('./uart.so') import logging logger = logging.getLogger(__name__) @@ -77,21 +77,21 @@ def generate_module_header(self): """] - + def generate_enums(self): lines = [c.generate_enums() for c in self.classes.values()] return sum(lines, []) - + def generate_boilerplate(self): return [""" - + class Header(ctypes.Structure): _fields_ = [("type_hilen", ctypes.c_ubyte), ("lolen", ctypes.c_ubyte), ("cls", ctypes.c_ubyte), ("command", ctypes.c_ubyte)] - + def init_connect(uart_port): \"\"\"Connects to the port, resets the device, then reconnects. Appears not to work correctly under Mac OS X\"\"\" @@ -118,14 +118,14 @@ def disconnect(): def send_message(packet, data): if data is not None: raise NotImplementedError - + logger.debug('sending: {0}'.format(np.fromstring(packet, dtype=np.uint8))) - + error = uart.uart_tx(len(packet), packet) if error: raise RuntimeError('failed to transmit') - - + + def read_message(timeout_ms): hdr = Header() rx = uart.uart_rx(4, ctypes.byref(hdr), timeout_ms) @@ -135,9 +135,9 @@ def read_message(timeout_ms): raise RuntimeError('read header failed with error {0}'.format(rx)) logger.debug('header: {0}'.format([hdr.type_hilen, hdr.lolen, hdr.cls, hdr.command])) - + data = '' - + if hdr.lolen: data = ctypes.create_string_buffer(256) rx = uart.uart_rx(hdr.lolen, data, timeout_ms) @@ -148,21 +148,23 @@ def read_message(timeout_ms): else: data = str(data[0:rx]) logger.debug('data: {0}'.format(np.fromstring(data, np.uint8))) - + if hdr.type_hilen + hdr.lolen + hdr.cls + hdr.command == 0: raise IOError('received null header') - + command, param_string = callbacks[(hdr.type_hilen & 0xf8, hdr.cls, hdr.command)] params = convert_params(param_string, data) - + logger.info('calling: {0}({1})'.format(command, params)) globals()[command](*params) - - + + def convert_params(param_string, data): params = [] for typename in param_string.split(', '): - if typename == 'bd_addr': + if typename == '': + param = None + elif typename == 'bd_addr': param = list(np.fromstring(data[0:6], np.uint8)) data = data[6:] elif typename == 'uint8array': @@ -171,13 +173,14 @@ def convert_params(param_string, data): data = data[(size + 1):] else: dtype = np.dtype(typename) - param = np.fromstring(data[0:dtype.itemsize], dtype)[0] + param = np.fromstring(data[0:dtype.itemsize], dtype)[0] data = data[dtype.itemsize:] - params.append(param) - - return params - - + if param is not None: + params.append(param) + + return params + + """] def generate_callbacks(self): @@ -186,8 +189,8 @@ def generate_callbacks(self): d = c.generate_callback_dict() lines += ["\t({0}, {1}),".format(k, v) for k, v in d.items()] return lines + ['])'] - - + + class ClassEl(BaseEl): type_id = 'class' def __init__(self, attrs): @@ -195,15 +198,15 @@ def __init__(self, attrs): self.commands = OrderedDict() self.events = OrderedDict() self.enums = [] - + def add_command(self, obj): obj.cls = self self.commands[obj.name] = obj - + def add_event(self, obj): obj.cls = self self.events[obj.name] = obj - + def add_enums(self, obj): self.enums.extend(obj.enums) # Note throws out some context @@ -214,15 +217,15 @@ def generate_commands(self): def generate_events(self): lines = [e.generate() for e in self.events.values()] return sum(lines, []) - + def generate_enums(self): lines = [e.generate() for e in self.enums] label = ['# Class {0} enums'.format(self.name)] result = label + sum(lines, []) return result + [''] if len(result) > 1 else [''] - + def generate_callback_dict(self): - rsps = [(c.callback_hdr(), c.callback()) for c in self.commands.values() if len(c.returns) > 0] + rsps = [(c.callback_hdr(), c.callback()) for c in self.commands.values()] evts = [(e.callback_hdr(), e.callback()) for e in self.events.values()] return OrderedDict(rsps + evts) @@ -234,119 +237,120 @@ def __init__(self, attrs): self.params = [] self.returns = [] self.cls = None - + def add_params(self, obj): self.params = obj.params - + def add_returns(self, obj): self.returns = obj.params - + def hdr(self): type_hilen = 0x00 + self.hilen lolen = sum([p.size() for p in self.params]) - hdr = [type_hilen, lolen, int(self.cls.index), int(self.index)] + hdr = [type_hilen, lolen, int(self.cls.index), int(self.index)] return tuple(hdr) - + def callback_hdr(self): type_hilen = 0x00 + self.hilen lolen = sum([r.size() for r in self.returns]) - hdr = [type_hilen & 0xf8, int(self.cls.index), int(self.index)] + hdr = [type_hilen & 0xf8, int(self.cls.index), int(self.index)] return tuple(hdr) - + def callback(self): name = '_'.join(['rsp', self.cls.name, self.name]) params = ', '.join([r.type for r in self.returns]) - return (name, params) - + return (name, params) + def generate(self): - name = '_'.join(['cmd', self.cls.name, self.name]) + name = '_'.join(['cmd', self.cls.name, self.name]) params = ', '.join([p.name for p in self.params]) param_conv = ' + '.join([p.convert() for p in self.params]) - if len(param_conv) < 1: param_conv += "''" + if len(param_conv) < 1: param_conv += "''" func = """ def {name}({params}): header = np.array({hdr}, dtype=np.uint8).tostring() - params = {param_conv} + params = {param_conv} packet = header + params data = None send_message(packet, data)""".format(name=name, params=params, param_conv=param_conv, hdr=self.hdr()) return [func] + self.generate_response() def generate_response(self): + name = '_'.join(['rsp', self.cls.name, self.name]) if len(self.returns) == 0: - return [] - name = '_'.join(['rsp', self.cls.name, self.name]) - params = ', '.join([p.name for p in self.returns]) + params = '' + else: + params = ', '.join([p.name for p in self.returns]) func = """ def {name}({params}): pass""".format(name=name, params=params, hdr=self.hdr()) return [func] - + class EventEl(CommandEl): type_id = 'event' hilen = 0x80 - + def callback(self): - name = '_'.join(['evt', self.cls.name, self.name]) + name = '_'.join(['evt', self.cls.name, self.name]) params = ', '.join([p.type for p in self.params]) - return (name, params) - + return (name, params) + def generate(self): - name = '_'.join(['evt', self.cls.name, self.name]) + name = '_'.join(['evt', self.cls.name, self.name]) params = ', '.join([p.name for p in self.params]) func = """ def {name}({params}): pass""".format(name=name, params=params) return [func] - - + + class ParamsEl(BaseEl): type_id = 'params' def __init__(self, attrs): super(ParamsEl, self).__init__(attrs) self.params = [] - + def add_param(self, obj): self.params.append(obj) - + class ReturnsEl(ParamsEl): - type_id = 'returns' - + type_id = 'returns' + class ParamEl(BaseEl): type_id = 'param' size_lookup = {'bd_addr': 6, 'uint8array': 1} conv_lookup = {'bd_addr': 'np.uint8({0})', 'uint8array': 'np.uint8([])'} - + def size(self): if self.size_lookup.has_key(self.type): return self.size_lookup[self.type] return np.dtype(self.type).itemsize - + def convert(self): if self.type == 'uint8array': conv = 'np.uint8([len({0})] + {0})' elif self.type == 'bd_addr': conv = 'np.uint8({0})' else: - conv = 'np.' + self.type + '({0})' + conv = 'np.' + self.type + '({0})' return conv.format(self.name) + '.tostring()' - + class EnumsEl(BaseEl): type_id = 'enums' def __init__(self, attrs): super(EnumsEl, self).__init__(attrs) self.enums = [] - + def add_enum(self, obj): - self.enums.append(obj) - + self.enums.append(obj) + class EnumEl(BaseEl): - type_id = 'enum' + type_id = 'enum' def generate(self): return ['{0} = {1}'.format(self.name.upper(), self.value)] - + class IgnoreEl(BaseEl): def __init__(self, type_id, attrs): super(IgnoreEl, self).__init__(attrs) @@ -360,25 +364,25 @@ def __init__(self, type_id, attrs): parser = argparse.ArgumentParser() parser.add_argument('--api', help='ble api xml file', default='../api/bleapi.xml') args = parser.parse_args() - + p = xml.parsers.expat.ParserCreate() stack = [] api = None - + # 3 handler functions def start_element(name, attrs): if name in ELEMENTS: el = ELEMENTS[name](attrs) else: el = IgnoreEl(name, attrs) - + stack.append(el) def end_element(name): if stack[-1].type_id != name: raise RuntimeError('bad stack', stack[-1].type_id, name) - + el = stack.pop() if type(el) == IgnoreEl: pass @@ -389,12 +393,12 @@ def end_element(name): if type(stack[-1]) != IgnoreEl: func = getattr(stack[-1], 'add_' + name) func(el) - - + + def char_data(data): if len(data.strip('\n ')) > 0: raise RuntimeError('unexpected char data - ({0})'.format(data)) - + p.StartElementHandler = start_element p.EndElementHandler = end_element p.CharacterDataHandler = char_data @@ -408,6 +412,6 @@ def char_data(data): output += api.generate_callbacks() output += api.generate_commands() output += api.generate_events() - + with open('ble.py', 'w') as fout: fout.write('\n'.join(output))