diff --git a/main.py b/main.py index c486c40..a22a4e2 100644 --- a/main.py +++ b/main.py @@ -7,144 +7,146 @@ import random import utotp import machine -# ========== 新增:导入light库和线程模块 ========== import light import _thread -# 读取/生成TOTP密钥 -time_int=0 +# ===================== 硬件引脚配置 ===================== +# ✅ 已改用 GPIO2,避免与 UART TX (GPIO1) 冲突,确保串口打印正常 +PIN_MODE_SWITCH = 2 + +# ========== 全局常量定义 ========== +BUTTON_CMD_VCC = "button1" # VCC 对应按钮指令 1 +BUTTON_CMD_GND = "button2" # GND 对应按钮指令 2 + +# 读取/生成 TOTP 密钥 +time_int = 0 try: with open('totpsecret.txt', 'r') as f: - totp = f.read() # 先把文件内容读到变量里 - print(f"TOTP密钥: {totp}") # 再打印变量(避免重复读文件) + totp = f.read() + print(f"TOTP 密钥:{totp}") except OSError: - print("文件不存在或无法读取,将生成") - totp_secret = key.generate() # 注意:这里要确保`key`模块已正确导入 - print("生成的TOTP密钥:", totp_secret) - print("密钥长度:", len(totp_secret)) + print("文件不存在或无法读取,将生成") + totp_secret = key.generate() + print("生成的 TOTP 密钥:", totp_secret) with open('totpsecret.txt', 'w') as f: f.write(totp_secret) -# ========== 新增:全局连接状态(控制呼吸灯) ========== -is_connected = False # 初始未连接 - -# 新增:用于去重的全局变量 -last_handle_data = "" # 最后一次处理的数据 -last_handle_time = 0 # 最后一次处理的时间 -DEBOUNCE_TIME = 50 # 防抖时间(ms),避免短时间重复调用 +# ========== 全局变量 ========== +is_connected = False +last_handle_data = "" +last_handle_time = 0 +DEBOUNCE_TIME = 50 id = random.randint(1000, 9999) -print(f"设备ID: {id}") - -deviceId='3ed661ad-77a8-42b9-9100-3737e9317f36' +deviceId = '3ed661ad-77a8-42b9-9100-3737e9317f36' -# 全局变量定义(供函数调用) ble = bluetooth.BLE() current_conn = None -notify_handle = None # 特征值句柄,供发送函数使用 +notify_handle = None # ===================== 封装的核心函数 ===================== def ble_send(data_str): - """ - BLE数据发送函数 - :param data_str: 要发送的字符串数据 - :return: 发送状态(True=成功,False=失败) - """ + """发送数据并打印日志""" global current_conn, ble, notify_handle - # 1. 校验输入 if not isinstance(data_str, str) or not data_str.strip(): - print("发送失败:输入为空或不是字符串") return False - - # 2. 校验连接状态 if current_conn is None: - print("发送失败:当前无BLE设备连接") + # print("发送失败:当前无 BLE 设备连接") return False + + try: + max_len = ble.config("mtu") - 3 + except Exception: + max_len = 20 + + send_data = data_str[:max_len] - # 3. 处理MTU长度限制(MTU-3为实际可发送的最大字节数) - max_len = ble.config("mtu") - 3 - send_data = data_str[:max_len] # 截断超长数据 - - # 4. 编码并发送数据 try: ble.gatts_notify(current_conn, notify_handle, send_data.encode("utf-8")) - print(f"发送成功: {send_data}") + # print(f"BLE 发送成功:{send_data}") # 减少发送确认日志,保持清爽 return True except Exception as e: - print(f"发送异常:{e}") + print(f"BLE 发送异常:{e}") return False def blehandle(received_data): - global time_int - data=received_data - print('收到:'+data) - """处理接收到的BLE数据(添加日志便于调试)""" - print(f"[处理数据] {received_data}") + """处理接收到的 BLE 命令""" + print(f"[收到 BLE 指令]: {received_data}") - if data == 'getTotp': - print('custumeTime:'+str(time_int)+'\ttotp:'+str(utotp.generate_totp(totp,custume_time=time_int))) - totp_code = utotp.generate_totp(totp,custume_time=time_int) - print(totp_code) + if received_data == 'getTotp': + totp_code = utotp.generate_totp(totp, custume_time=time_int) ble_send(totp_code) - elif data == "getId": + elif received_data == "getId": ble_send(deviceId) - elif data.startswith("setTime:"): - print('int('+data+')') - time_int=int(data[8:]) + elif received_data.startswith("setTime:"): try: - # 提取时间戳字符串 timestamp_str = received_data.split(":", 1)[1] timestamp = int(timestamp_str) - - # 将 Unix 时间戳转换为 (year, month, day, weekday, hour, minute, second, subsecond) - # 注意:MicroPython 的 RTC.datetime() 使用的是本地时间,但通常我们用 UTC - import time - tm = time.gmtime(timestamp) # 转为 struct_time (UTC) - - # 格式:(year, month, day, weekday, hour, minute, second, subsecond) - # weekday: Monday=0, Sunday=6 → 但 MicroPython RTC 要求 Monday=1, Sunday=7 - rtc_tuple = ( - tm[0], # year - tm[1], # month - tm[2], # day - tm[6] + 1, # weekday: gmtime 返回 Monday=0 → +1 变成 Monday=1 - tm[3], # hour - tm[4], # minute - tm[5], # second - 0 # subsecond (ignored on most ports) - ) - - # 设置 RTC - import machine + import time as time_mod + tm = time_mod.gmtime(timestamp) + rtc_tuple = (tm[0], tm[1], tm[2], tm[6]+1, tm[3], tm[4], tm[5], 0) machine.RTC().datetime(rtc_tuple) - - # 验证是否设置成功 - current = time.time() - print(f"时间已设置!当前 Unix 时间: {current}, 设备时间: {time.localtime()}") - print(time.time()) ble_send(f"OK:setTime:{timestamp}") - except Exception as e: - print(f"设置时间失败: {e}") ble_send(f"ERROR:setTime:{e}") - print(time.time()) -# ========== 新增:呼吸灯线程函数 ========== +# ========== 呼吸灯与状态发包线程 ========== def breathing_light_thread(): - """呼吸灯控制线程:未连接→蓝色呼吸,已连接→绿色常亮""" + global is_connected + # 使用修改后的 PIN_MODE_SWITCH (GPIO2) + pin_switch = machine.Pin(PIN_MODE_SWITCH, machine.Pin.IN, machine.Pin.PULL_UP) + last_switch_val = None + while True: + switch_val = pin_switch.value() + + # --- 1. 检测状态变化 --- + if switch_val != last_switch_val: + last_switch_val = switch_val + + # 确定颜色逻辑 (保持一致性) + target_color = light.COLORS["GREEN"] if switch_val == 1 else light.COLORS["RED"] + + # --- 2. 实时串口打印 --- + current_time = time.localtime() + print("\n[GPIO 变更]: " + "="*30) + if switch_val == 1: + print(f"时间:{current_time[0]}-{current_time[1]}-{current_time[2]} {current_time[3]}:{current_time[4]}:{current_time[5]}") + print(f"引脚:GPIO{PIN_MODE_SWITCH} -> 高电平 (VCC)") + print(f"模式:{BUTTON_CMD_VCC} (绿灯常亮/呼吸)") + else: + print(f"时间:{current_time[0]}-{current_time[1]}-{current_time[2]} {current_time[3]}:{current_time[4]}:{current_time[5]}") + print(f"引脚:GPIO{PIN_MODE_SWITCH} -> 低电平 (GND)") + print(f"模式:{BUTTON_CMD_GND} (红灯常亮/呼吸)") + print("="*30 + "\n") + + # --- 3. 自动发送状态包 (核心需求) --- + cmd_to_send = BUTTON_CMD_VCC if switch_val == 1 else BUTTON_CMD_GND + + # 只有已连接时才发送蓝牙通知 + if is_connected: + if ble_send(cmd_to_send): + print(f"> 蓝牙状态包已推送:[{cmd_to_send}]") + else: + print(f"! 蓝牙未连接,暂存状态:{cmd_to_send}") + else: + print(f"! 设备未连接,跳过蓝牙发送,状态为:{cmd_to_send}") + + # --- 4. 控制 LED --- + target_color = light.COLORS["GREEN"] if switch_val == 1 else light.COLORS["RED"] + if not is_connected: - # 未连接:蓝色呼吸灯(speed=5为快速呼吸) - light.breathing_light(light.COLORS["BLUE"], speed=5) + # 未连接:呼吸闪烁 + light.breathing_light(target_color, speed=5) else: - # 已连接:绿色常亮,降低循环频率节省功耗 - light.set_rgb(light.COLORS["GREEN"]) + # 已连接:常亮 + light.set_rgb(target_color) time.sleep_ms(100) -# 2. UUID与BLE配置 +# ===================== BLE 初始化与配置 ===================== SERVICE_UUID = "d816e4c6-1b99-4da7-bcd5-7c37cc2642c4" CHARACTERISTIC_UUID = "d816e4c7-1b99-4da7-bcd5-7c37cc2642c4" @@ -158,15 +160,9 @@ def breathing_light_thread(): _FLAG_WRITE_NO_RESPONSE = const(0x0004) _FLAG_NOTIFY = const(0x0010) -# 初始化BLE ble.active(True) -ble.config( - addr_mode=0x01, - gap_name="esp32", - mtu=256 -) +ble.config(addr_mode=0x01, gap_name="esp32", mtu=256) -# 注册BLE服务 services = [ (bluetooth.UUID(SERVICE_UUID), ( (bluetooth.UUID(CHARACTERISTIC_UUID), @@ -174,9 +170,8 @@ def breathing_light_thread(): )), ] handles = ble.gatts_register_services(services) -notify_handle = handles[0][0] # 赋值给全局变量,供ble_send使用 +notify_handle = handles[0][0] -# 4. 广播数据(提前定义,修复disconnect事件中未定义的问题) DEVICE_NAME = "Cpen"+str(id) adv_data = b''.join([ b'\x02', b'\x01', b'\x06', @@ -184,114 +179,87 @@ def breathing_light_thread(): b'\x11', b'\x07', b'\xc4\x42\x26\xcc\x37\x7c\xd5\xbc\xd5\xa7\x99\x1b\xc6\xe4\x16\xd8' ]) -# 3. BLE事件回调(修复重复调用问题,新增连接状态更新) +# 事件回调 (修复了之前被截断的问题) def on_ble_event(event, data): - global current_conn, last_handle_data, last_handle_time, is_connected # 新增is_connected + global current_conn, last_handle_data, last_handle_time, is_connected - # 打印事件日志,便于调试 current_time = time.ticks_ms() - print(f"[BLE事件] 类型: {event}, 数据: {data}, 时间: {current_time}") if event == _IRQ_CENTRAL_CONNECT: conn_handle, _, _ = data current_conn = conn_handle - is_connected = True # 新增:标记为已连接 - print(f"已连接(句柄: {conn_handle})") + is_connected = True + print(f"[BLE] 连接建立 (Handle: {conn_handle})") ble.gap_advertise(None) elif event == _IRQ_CENTRAL_DISCONNECT: conn_handle, _, _ = data current_conn = None - is_connected = False # 新增:标记为未连接 - print(f"已断开(句柄: {conn_handle})") - # 修复:adv_data已提前定义 + is_connected = False + print(f"[BLE] 断开连接 (Handle: {conn_handle})") ble.gap_advertise(30000, adv_data=adv_data) elif event == _IRQ_GATTS_READ_REQUEST: - conn_handle, attr_handle = data - print(f"电脑端读请求(句柄: {attr_handle})") return 0 elif event == _IRQ_GATTS_WRITE: conn_handle, attr_handle = data - - # 1. 读取数据并处理编码异常 try: received_data = ble.gatts_read(attr_handle).decode("utf-8").strip() except UnicodeDecodeError: - print("数据解码失败,跳过处理") - # 清空特征值缓存 ble.gatts_write(attr_handle, b"") return - # 2. 去重+防抖过滤 + # 去重 + 防抖 if (received_data == last_handle_data and time.ticks_diff(current_time, last_handle_time) < DEBOUNCE_TIME): - print(f"跳过重复调用:{received_data}") - # 清空特征值缓存 ble.gatts_write(attr_handle, b"") return - # 3. 更新最后处理的信息 last_handle_data = received_data last_handle_time = current_time - # 4. 调用处理函数(仅在数据有效且非重复时) + # ✅ 此处已修复之前的截断错误 if received_data: blehandle(received_data) - # 5. 清空特征值缓存,避免重复读取 ble.gatts_write(attr_handle, b"") -# 注册事件回调 ble.irq(on_ble_event) -# ========== 新增:初始化呼吸灯并启动线程 ========== -# 初始化RGB灯(引脚48,可根据你的硬件修改pin_num参数) -light.init_rgb(pin_num=48) -# 启动呼吸灯独立线程(避免阻塞主线程) +# 初始化 LED 模块 +try: + light.init_rgb(pin_num=48) +except Exception as e: + print(f"LED 模块初始化警告:{e}") + +# 启动后台线程 _thread.start_new_thread(breathing_light_thread, ()) # 开始广播 ble.gap_advertise(30000, adv_data=adv_data) -print("="*40) -print("BLE初始化完成") -print(f"设备名称: {DEVICE_NAME}") -print(f"服务UUID: {SERVICE_UUID}") -print(f"特征UUID: {CHARACTERISTIC_UUID}") -print("等待设备连接...连接后输入内容并回车发送") -print("="*40) +print("-" * 40) +print("系统启动完成") +print(f"设备 ID: {id}") +print(f"GPIO 开关:{PIN_MODE_SWITCH}") +print(f"规则:VCC 发 button1 / GND 发 button2") +print("-" * 40) -# 5. 非阻塞输入(调用ble_send发送数据) +# 本地手动发送 (可选) input_buffer = "" -prompt_printed = True - -# 创建poll对象,监听stdin的可读事件 poll_obj = select.poll() poll_obj.register(sys.stdin, select.POLLIN) while True: - # 非阻塞检查输入 events = poll_obj.poll(0) if events: char = sys.stdin.read(1) if char == '\n': if input_buffer.strip(): - ble_send(input_buffer) # 调用封装的发送函数 - else: - print("\n输入为空,请输入内容后发送") - - # 清空缓冲区+重置提示符 + ble_send(input_buffer) input_buffer = "" - prompt_printed = False else: input_buffer += char - # 打印输入提示符 - if not prompt_printed: - print("> ", end="") - prompt_printed = True - - # 释放CPU time.sleep_ms(10)