diff --git a/docs/i18n.md b/docs/i18n.md new file mode 100644 index 000000000..34f846574 --- /dev/null +++ b/docs/i18n.md @@ -0,0 +1,156 @@ +# Uni-Lab-OS i18n 国际化支持 + +Uni-Lab-OS 现在支持多语言国际化(i18n),支持英文(en_US)和中文(zh_CN)。 + +## 使用方法 + +### 1. 设置语言 + +#### 通过环境变量 +```bash +export UNILABOS_LANGUAGE=zh_CN # 设置为中文 +export UNILABOS_LANGUAGE=en_US # 设置为英文(默认) +``` + +#### 通过代码 +```python +from unilabos.i18n import set_language + +# 切换到中文 +set_language("zh_CN") + +# 切换到英文 +set_language("en_US") +``` + +### 2. 在代码中使用翻译 + +#### 基础用法 +```python +from unilabos.i18n import _ + +# 简单翻译 +message = _("Hello World") +print(message) # 输出: 你好世界 (中文) 或 Hello World (英文) +``` + +#### 格式化字符串 +```python +from unilabos.i18n import _ + +# 使用 format 方法 +message = _("Found {count} missing packages").format(count=5) +print(message) # 输出: 发现 5 个缺失的包 (中文) +``` + +#### 复数形式 +```python +from unilabos.i18n import ngettext + +# 复数翻译 +message = ngettext("One item", "{count} items", item_count).format(count=item_count) +``` + +### 3. 支持的翻译函数 + +| 函数 | 用途 | 示例 | +|------|------|------| +| `_()` | 简单翻译 | `_("Hello")` | +| `ngettext()` | 复数形式 | `ngettext("1 item", "{n} items", count)` | +| `set_language()` | 设置语言 | `set_language("zh_CN")` | +| `get_current_language()` | 获取当前语言 | `get_current_language()` | + +## 添加新语言 + +### 1. 创建翻译目录 +```bash +mkdir -p unilabos/i18n/locales/xx_XX/LC_MESSAGES +``` + +### 2. 复制模板文件 +```bash +cp unilabos/i18n/locales/messages.pot unilabos/i18n/locales/xx_XX/LC_MESSAGES/messages.po +``` + +### 3. 翻译字符串 +编辑 `messages.po` 文件,填写 `msgstr`: +```po +msgid "Hello World" +msgstr "你好世界" +``` + +### 4. 编译翻译文件 +```bash +cd unilabos/i18n +python compile_translations.py +``` + +### 5. 更新支持的语言列表 +编辑 `unilabos/i18n/__init__.py`,在 `SUPPORTED_LANGUAGES` 中添加新语言: +```python +SUPPORTED_LANGUAGES = { + "en_US": "English", + "zh_CN": "中文", + "xx_XX": "New Language", # 添加新语言 +} +``` + +## 提取可翻译字符串 + +使用以下命令从源代码中提取可翻译字符串: + +```bash +# 安装 xgettext 工具(可选) +# 或手动添加字符串到 messages.pot 文件 +``` + +## 测试翻译 + +运行测试确保翻译正常工作: + +```bash +python tests/test_i18n.py +``` + +## 翻译文件结构 + +``` +unilabos/i18n/ +├── __init__.py # i18n 主模块 +├── compile_translations.py # 编译脚本 +├── locales/ +│ ├── messages.pot # 翻译模板 +│ ├── en_US/ +│ │ └── LC_MESSAGES/ +│ │ ├── messages.po # 英文翻译 +│ │ └── messages.mo # 编译后的英文翻译 +│ └── zh_CN/ +│ └── LC_MESSAGES/ +│ ├── messages.po # 中文翻译 +│ └── messages.mo # 编译后的中文翻译 +``` + +## 注意事项 + +1. **默认语言**: 默认语言是英文(en_US),不需要翻译 +2. **缺失翻译**: 如果某个字符串没有翻译,将显示原始字符串 +3. **格式保持**: 翻译时请保持原始字符串中的 `{variable}` 格式标记 +4. **编码**: 所有 PO 文件必须使用 UTF-8 编码 + +## 已翻译的内容 + +目前以下模块已支持 i18n: + +- `unilabos/app/main.py` - 主程序入口 +- `unilabos/utils/banner_print.py` - 横幅和状态打印 +- `unilabos/utils/environment_check.py` - 环境检查 + +## 贡献翻译 + +欢迎贡献更多语言的翻译!请: + +1. Fork 仓库 +2. 创建新的翻译文件 +3. 提交 PR + +参考 issue #32 获取更多关于 i18n 支持的信息。 diff --git a/tests/test_i18n.py b/tests/test_i18n.py new file mode 100644 index 000000000..29d2fd1cb --- /dev/null +++ b/tests/test_i18n.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Uni-Lab-OS i18n 国际化支持测试 + +测试内容: +1. 语言切换功能 +2. 翻译字符串匹配 +3. 翻译文件加载 +""" + +import os +import sys +import unittest + +# 添加项目路径 +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from unilabos.i18n import _, ngettext, set_language, get_current_language, SUPPORTED_LANGUAGES + + +class TestI18n(unittest.TestCase): + """i18n 功能测试""" + + def setUp(self): + """测试前设置默认语言""" + set_language("en_US") + + def tearDown(self): + """测试后恢复默认语言""" + set_language("en_US") + + def test_supported_languages(self): + """测试支持的语言列表""" + self.assertIn("en_US", SUPPORTED_LANGUAGES) + self.assertIn("zh_CN", SUPPORTED_LANGUAGES) + self.assertEqual(SUPPORTED_LANGUAGES["en_US"], "English") + self.assertEqual(SUPPORTED_LANGUAGES["zh_CN"], "中文") + + def test_set_language(self): + """测试语言切换""" + # 测试有效语言 + self.assertTrue(set_language("en_US")) + self.assertEqual(get_current_language(), "en_US") + + self.assertTrue(set_language("zh_CN")) + self.assertEqual(get_current_language(), "zh_CN") + + # 测试无效语言 + self.assertFalse(set_language("invalid_lang")) + # 语言应该保持之前的设置 + self.assertEqual(get_current_language(), "zh_CN") + + def test_english_translation(self): + """测试英文翻译(默认语言,应返回原字符串)""" + set_language("en_US") + + # 英文是默认语言,应该返回原字符串 + self.assertEqual(_("Version:"), "Version:") + self.assertEqual(_("System:"), "System:") + self.assertEqual(_("Configuration:"), "Configuration:") + + def test_chinese_translation(self): + """测试中文翻译""" + set_language("zh_CN") + + # 测试已翻译的字符串 + self.assertEqual(_("Version:"), "版本:") + self.assertEqual(_("System:"), "系统:") + self.assertEqual(_("Configuration:"), "配置:") + self.assertEqual(_("Backend:"), "后端:") + self.assertEqual(_("INFO"), "信息") + self.assertEqual(_("SUCCESS"), "成功") + self.assertEqual(_("WARNING"), "警告") + self.assertEqual(_("ERROR"), "错误") + + def test_missing_translation(self): + """测试缺失翻译时应返回原字符串""" + set_language("zh_CN") + + # 测试未翻译的字符串应该返回原字符串 + untranslated = "This string does not exist in translations" + self.assertEqual(_(untranslated), untranslated) + + def test_formatted_strings(self): + """测试格式化字符串翻译""" + set_language("zh_CN") + + # 测试带格式的字符串 + result = _("发现 {count} 个缺失的包").format(count=5) + self.assertEqual(result, "发现 5 个缺失的包") + + result = _("设备信息已保存到 {path}").format(path="/tmp/test.json") + self.assertEqual(result, "设备信息已保存到 /tmp/test.json") + + def test_ngettext(self): + """测试复数形式翻译""" + # 英文测试 + set_language("en_US") + self.assertEqual(ngettext("One item", "{n} items", 1), "One item") + self.assertEqual(ngettext("One item", "{n} items", 5), "{n} items") + + # 中文测试(中文没有复数变化,应该返回复数形式) + set_language("zh_CN") + result = ngettext("One item", "{n} items", 1) + # 中文应该使用复数形式 + self.assertIn(result, ["{n} items"]) # 如果未翻译则返回原字符串 + + def test_translation_files_exist(self): + """测试翻译文件是否存在""" + from unilabos.i18n import get_locale_dir + + locale_dir = get_locale_dir() + + # 检查英文翻译文件 + en_po = os.path.join(locale_dir, "en_US", "LC_MESSAGES", "messages.po") + en_mo = os.path.join(locale_dir, "en_US", "LC_MESSAGES", "messages.mo") + self.assertTrue(os.path.exists(en_po), f"English PO file not found: {en_po}") + self.assertTrue(os.path.exists(en_mo), f"English MO file not found: {en_mo}") + + # 检查中文翻译文件 + zh_po = os.path.join(locale_dir, "zh_CN", "LC_MESSAGES", "messages.po") + zh_mo = os.path.join(locale_dir, "zh_CN", "LC_MESSAGES", "messages.mo") + self.assertTrue(os.path.exists(zh_po), f"Chinese PO file not found: {zh_po}") + self.assertTrue(os.path.exists(zh_mo), f"Chinese MO file not found: {zh_mo}") + + def test_translation_content(self): + """测试翻译文件内容""" + from unilabos.i18n import get_locale_dir, _load_po_file + + # 加载中文翻译 + zh_translations = _load_po_file("zh_CN") + + # 验证关键翻译存在 + self.assertIn("Version:", zh_translations) + self.assertEqual(zh_translations["Version:"], "版本:") + + self.assertIn("INFO", zh_translations) + self.assertEqual(zh_translations["INFO"], "信息") + + def test_environment_variable(self): + """测试环境变量设置语言""" + import importlib + + # 保存原始环境变量 + original_lang = os.environ.get("UNILABOS_LANGUAGE") + + try: + # 设置中文环境变量 + os.environ["UNILABOS_LANGUAGE"] = "zh_CN" + + # 重新加载模块以触发 _init_language + if "unilabos.i18n" in sys.modules: + del sys.modules["unilabos.i18n"] + + from unilabos.i18n import get_current_language + + # 验证语言已切换 + self.assertEqual(get_current_language(), "zh_CN") + + finally: + # 恢复原始环境变量 + if original_lang is not None: + os.environ["UNILABOS_LANGUAGE"] = original_lang + elif "UNILABOS_LANGUAGE" in os.environ: + del os.environ["UNILABOS_LANGUAGE"] + + # 恢复默认语言 + set_language("en_US") + + +class TestI18nIntegration(unittest.TestCase): + """i18n 集成测试 - 测试实际使用场景""" + + def setUp(self): + """测试前设置为英文""" + set_language("en_US") + + def tearDown(self): + """测试后恢复默认语言""" + set_language("en_US") + + def test_banner_print_integration(self): + """测试横幅打印功能存在并可调用""" + from unilabos.utils.banner_print import print_status + + # 简单测试函数可以被调用不抛出异常 + import io + import sys + + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + + try: + print_status("Test message", "info") + output = captured.getvalue() + # 只要有输出即可,不管语言 + self.assertTrue(len(output) > 0) + self.assertIn("Test message", output) + finally: + sys.stdout = old_stdout + + def test_i18n_core_functionality(self): + """测试 i18n 核心功能 - 语言切换""" + # 测试语言切换功能 + set_language("en_US") + self.assertEqual(get_current_language(), "en_US") + + set_language("zh_CN") + self.assertEqual(get_current_language(), "zh_CN") + + # 测试翻译 + set_language("en_US") + self.assertEqual(_("Version:"), "Version:") + + set_language("zh_CN") + self.assertEqual(_("Version:"), "版本:") + + +def run_tests(): + """运行所有测试""" + # 创建测试套件 + loader = unittest.TestLoader() + suite = unittest.TestSuite() + + # 添加测试类 + suite.addTests(loader.loadTestsFromTestCase(TestI18n)) + suite.addTests(loader.loadTestsFromTestCase(TestI18nIntegration)) + + # 运行测试 + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + # 返回测试结果 + return result.wasSuccessful() + + +if __name__ == "__main__": + success = run_tests() + sys.exit(0 if success else 1) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 937512627..b25136728 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -21,6 +21,14 @@ from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.config.config import load_config, BasicConfig, HTTPConfig +# 导入 i18n 支持 +try: + from unilabos.i18n import _ +except ImportError: + # 如果 i18n 模块不可用,使用空翻译 + def _(message: str) -> str: + return message + # Global restart flags (used by ws_client and web/server) _restart_requested: bool = False _restart_reason: str = "" @@ -31,13 +39,13 @@ def load_config_from_file(config_path): config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None) if config_path: if not os.path.exists(config_path): - print_status(f"配置文件 {config_path} 不存在", "error") + print_status(_("配置文件 {config_path} 不存在").format(config_path=config_path), "error") elif not config_path.endswith(".py"): - print_status(f"配置文件 {config_path} 不是Python文件,必须以.py结尾", "error") + print_status(_("配置文件 {config_path} 不是Python文件,必须以.py结尾").format(config_path=config_path), "error") else: load_config(config_path) else: - print_status(f"启动 UniLab-OS时,配置文件参数未正确传入 --config '{config_path}' 尝试本地配置...", "warning") + print_status(_("启动 UniLab-OS时,配置文件参数未正确传入 --config '{config_path}' 尝试本地配置...").format(config_path=config_path), "warning") load_config(config_path) @@ -54,23 +62,23 @@ def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser): def parse_args(): """解析命令行参数""" - parser = argparse.ArgumentParser(description="Start Uni-Lab Edge server.") + parser = argparse.ArgumentParser(description=_("Start Uni-Lab Edge server.")) subparsers = parser.add_subparsers(title="Valid subcommands", dest="command") - parser.add_argument("-g", "--graph", help="Physical setup graph file path.") - parser.add_argument("-c", "--controllers", default=None, help="Controllers config file path.") + parser.add_argument("-g", "--graph", help=_("Physical setup graph file path.")) + parser.add_argument("-c", "--controllers", default=None, help=_("Controllers config file path.")) parser.add_argument( "--registry_path", type=str, default=None, action="append", - help="Path to the registry directory", + help=_("Path to the registry directory"), ) parser.add_argument( "--working_dir", type=str, default=None, - help="Path to the working directory", + help=_("Path to the working directory"), ) parser.add_argument( "--backend", @@ -87,44 +95,44 @@ def parse_args(): parser.add_argument( "--is_slave", action="store_true", - help="Run the backend as slave node (without host privileges).", + help=_("Run the backend as slave node (without host privileges)."), ) parser.add_argument( "--slave_no_host", action="store_true", - help="Skip waiting for host service in slave mode", + help=_("Skip waiting for host service in slave mode"), ) parser.add_argument( "--upload_registry", action="store_true", - help="Upload registry information when starting unilab", + help=_("Upload registry information when starting unilab"), ) parser.add_argument( "--use_remote_resource", action="store_true", - help="Use remote resources when starting unilab", + help=_("Use remote resources when starting unilab"), ) parser.add_argument( "--config", type=str, default=None, - help="Configuration file path, supports .py format Python config files", + help=_("Configuration file path, supports .py format Python config files"), ) parser.add_argument( "--port", type=int, default=None, - help="Port for web service information page", + help=_("Port for web service information page"), ) parser.add_argument( "--disable_browser", action="store_true", - help="Disable opening information page on startup", + help=_("Disable opening information page on startup"), ) parser.add_argument( "--2d_vis", action="store_true", - help="Enable 2D visualization when starting pylabrobot instance", + help=_("Enable 2D visualization when starting pylabrobot instance"), ) parser.add_argument( "--visual", @@ -136,30 +144,30 @@ def parse_args(): "--ak", type=str, default="", - help="Access key for laboratory requests", + help=_("Access key for laboratory requests"), ) parser.add_argument( "--sk", type=str, default="", - help="Secret key for laboratory requests", + help=_("Secret key for laboratory requests"), ) parser.add_argument( "--addr", type=str, default="https://uni-lab.bohrium.com/api/v1", - help="Laboratory backend address", + help=_("Laboratory backend address"), ) parser.add_argument( "--skip_env_check", action="store_true", - help="Skip environment dependency check on startup", + help=_("Skip environment dependency check on startup"), ) parser.add_argument( "--complete_registry", action="store_true", default=False, - help="Complete registry information", + help=_("Complete registry information"), ) parser.add_argument( "--check_mode", @@ -170,7 +178,7 @@ def parse_args(): parser.add_argument( "--no_update_feedback", action="store_true", - help="Disable sending update feedback to server", + help=_("Disable sending update feedback to server"), ) parser.add_argument( "--test_mode", @@ -189,7 +197,7 @@ def parse_args(): "--workflow_file", type=str, required=True, - help="Path to the workflow file (JSON format)", + help=_("Path to the workflow file (JSON format)"), ) workflow_parser.add_argument( "-n", @@ -203,13 +211,13 @@ def parse_args(): type=str, nargs="*", default=[], - help="Tags for the workflow (space-separated)", + help=_("Tags for the workflow (space-separated)"), ) workflow_parser.add_argument( "--published", action="store_true", default=False, - help="Whether to publish the workflow (default: False)", + help=_("Whether to publish the workflow (default: False)"), ) workflow_parser.add_argument( "--description", @@ -236,10 +244,10 @@ def main(): from unilabos.utils.environment_check import check_environment if not check_environment(auto_install=True): - print_status("环境检查失败,程序退出", "error") + print_status(_("环境检查失败,程序退出"), "error") os._exit(1) else: - print_status("跳过环境依赖检查", "warning") + print_status(_("跳过环境依赖检查"), "warning") # 加载配置文件,优先加载config,然后从env读取 config_path = args_dict.get("config") @@ -300,7 +308,7 @@ def main(): os._exit(1) # 加载配置文件 (check_mode 跳过) - print_status(f"当前工作目录为 {working_dir}", "info") + print_status(_("当前工作目录为 {working_dir}").format(working_dir=working_dir)), "info") if not check_mode: load_config_from_file(config_path) @@ -315,13 +323,13 @@ def main(): if args.addr != parser.get_default("addr"): if args.addr == "test": - print_status("使用测试环境地址", "info") + print_status(_("使用测试环境地址"), "info") HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" elif args.addr == "uat": - print_status("使用uat环境地址", "info") + print_status(_("使用uat环境地址"), "info") HTTPConfig.remote_addr = "https://uni-lab.uat.bohrium.com/api/v1" elif args.addr == "local": - print_status("使用本地环境地址", "info") + print_status(_("使用本地环境地址"), "info") HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" else: HTTPConfig.remote_addr = args.addr @@ -329,25 +337,25 @@ def main(): # 设置BasicConfig参数 if args_dict.get("ak", ""): BasicConfig.ak = args_dict.get("ak", "") - print_status("传入了ak参数,优先采用传入参数!", "info") + print_status(_("传入了ak参数,优先采用传入参数!"), "info") if args_dict.get("sk", ""): BasicConfig.sk = args_dict.get("sk", "") - print_status("传入了sk参数,优先采用传入参数!", "info") + print_status(_("传入了sk参数,优先采用传入参数!"), "info") BasicConfig.working_dir = working_dir workflow_upload = args_dict.get("command") in ("workflow_upload", "wf") # 使用远程资源启动 if not workflow_upload and args_dict["use_remote_resource"]: - print_status("使用远程资源启动", "info") + print_status(_("使用远程资源启动"), "info") from unilabos.app.web import http_client res = http_client.resource_get("host_node", False) if str(res.get("code", 0)) == "0" and len(res.get("data", [])) > 0: - print_status("远程资源已存在,使用云端物料!", "info") + print_status(_("远程资源已存在,使用云端物料!"), "info") args_dict["graph"] = None else: - print_status("远程资源不存在,本地将进行首次上报!", "info") + print_status(_("远程资源不存在,本地将进行首次上报!"), "info") BasicConfig.port = args_dict["port"] if args_dict["port"] else BasicConfig.port BasicConfig.disable_browser = args_dict["disable_browser"] or BasicConfig.disable_browser @@ -388,33 +396,33 @@ def main(): # Check mode: complete_registry 完成后直接退出,git diff 检测由 CI workflow 执行 if check_mode: - print_status("Check mode: complete_registry 完成,退出", "info") + print_status(_("Check mode: complete_registry 完成,退出"), "info") os._exit(0) if BasicConfig.upload_registry: # 设备注册到服务端 - 需要 ak 和 sk if BasicConfig.ak and BasicConfig.sk: - print_status("开始注册设备到服务端...", "info") + print_status(_("开始注册设备到服务端..."), "info") try: register_devices_and_resources(lab_registry) - print_status("设备注册完成", "info") + print_status(_("设备注册完成"), "info") except Exception as e: - print_status(f"设备注册失败: {e}", "error") + print_status(_("设备注册失败: {error}").format(error=e)), "error") else: - print_status("未提供 ak 和 sk,跳过设备注册", "info") + print_status(_("未提供 ak 和 sk,跳过设备注册"), "info") else: - print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning") + print_status(_("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry"), "warning") # 处理 workflow_upload 子命令 if workflow_upload: from unilabos.workflow.wf_utils import handle_workflow_upload_command handle_workflow_upload_command(args_dict) - print_status("工作流上传完成,程序退出", "info") + print_status(_("工作流上传完成,程序退出"), "info") os._exit(0) if not BasicConfig.ak or not BasicConfig.sk: - print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") + print_status(_("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!"), "warning") os._exit(1) graph: nx.Graph resource_tree_set: ResourceTreeSet diff --git a/unilabos/devices/altai_dam/__init__.py b/unilabos/devices/altai_dam/__init__.py new file mode 100644 index 000000000..8963b6c0c --- /dev/null +++ b/unilabos/devices/altai_dam/__init__.py @@ -0,0 +1,12 @@ +""" +阿尔泰科技DAM3000M系列控制仪表设备支持 + +支持的设备: +- DAM3060V: 4通道模拟输出模块 +- DAM3151: 32通道模拟输入模块 +""" + +from unilabos.devices.altai_dam.dam3060v import DAM3060V +from unilabos.devices.altai_dam.dam3151 import DAM3151 + +__all__ = ["DAM3060V", "DAM3151"] \ No newline at end of file diff --git a/unilabos/devices/altai_dam/dam3060v.py b/unilabos/devices/altai_dam/dam3060v.py new file mode 100644 index 000000000..f9ec3744f --- /dev/null +++ b/unilabos/devices/altai_dam/dam3060v.py @@ -0,0 +1,159 @@ +""" +DAM3060V - 4通道模拟输出模块 + +输出范围: +- Mode 0: -10V ~ 10V (range_code=9) +- Mode 1: -5V ~ 5V (range_code=8) +- Mode 2: 0V ~ 10V (range_code=14) +- Mode 3: 0V ~ 5V (range_code=13) +""" + +from math import ceil +from typing import Tuple, Dict +import ctypes +from ctypes import wintypes + +from unilabos.devices.altai_dam.dam_base import DAMDeviceBase + + +class DAM3060V(DAMDeviceBase): + """ + DAM3060V - 4通道模拟输出模块 + + 每个通道可独立设置输出范围和输出电压 + """ + + # 设备参数 + AOAddr = 352 # 模拟输出寄存器起始地址 + RangeAddr = 272 # 量程设置寄存器起始地址 + ChannelNum = 4 # 通道数 + RangeNum = 4 # 量程模式数 + ChannelAddr = 2 # 通道地址间隔 + + # 量程模式映射 (range_code: (最小值, 最大值)) + RangeModes: Dict[int, Tuple[float, float]] = { + 9: (-10.0, 10.0), # Mode 0: -10V to 10V + 8: (-5.0, 5.0), # Mode 1: -5V to 5V + 14: (0.0, 10.0), # Mode 2: 0V to 10V + 13: (0.0, 5.0) # Mode 3: 0V to 5V + } + + # 默认量程模式 (每个通道) + DefaultModeList = [8, 8, 8, 8] # 默认全部为 -5V ~ 5V + + def __init__(self, com_id: int, baud_rate: int, device_id: int, + mode_list: list = None, dll_path: str = None): + """ + 初始化DAM3060V模块 + + Args: + com_id: 串口号 + baud_rate: 波特率代码 + device_id: 设备ID + mode_list: 各通道量程模式列表 (长度为4,元素为range_code) + dll_path: DLL文件路径 + """ + super().__init__(com_id, baud_rate, device_id, dll_path) + + # 初始化通道量程模式 + self.channel_modes = {} + mode_list = mode_list or self.DefaultModeList + + # 设置各通道量程 + for channel in range(self.ChannelNum): + self.set_output_range_mode(channel, mode_list[channel]) + + def set_output_range_mode(self, channel: int, range_code: int): + """ + 设置通道输出范围模式 + + Args: + channel: 通道号 (0-3) + range_code: 量程代码 (8, 9, 13, 14) + + Raises: + ValueError: 参数错误 + RuntimeError: 设置失败 + """ + if channel not in range(self.ChannelNum): + raise ValueError(f"通道号必须在 0-{self.ChannelNum-1} 之间") + + if range_code not in self.RangeModes: + raise ValueError(f"量程代码必须是 {list(self.RangeModes.keys())} 之一") + + reg_addr = self.RangeAddr + channel + if not self.write_single_reg(reg_addr, range_code): + raise RuntimeError( + f"设置量程失败: handle={self.handle}, device_id={self.device_id}, " + f"addr={reg_addr}, mode={range_code}" + ) + + self.channel_modes[channel] = range_code + + def set_analog_output(self, channel: int, voltage: float): + """ + 设置模拟输出电压 + + Args: + channel: 通道号 (0-3) + voltage: 输出电压 (V) + + Raises: + ValueError: 参数错误或电压超出范围 + RuntimeError: 设置失败 + """ + if channel not in range(self.ChannelNum): + raise ValueError(f"通道号必须在 0-{self.ChannelNum-1} 之间") + + if channel not in self.channel_modes: + raise ValueError(f"通道 {channel} 尚未设置量程模式") + + range_code = self.channel_modes[channel] + range_bottom, range_top = self.RangeModes[range_code] + + # 检查电压范围 + if not (range_bottom <= voltage <= range_top): + raise ValueError( + f"电压 {voltage}V 超出范围 [{range_bottom}V, {range_top}V]" + ) + + # 计算数字值 (12位精度) + dal_lsb = ceil((voltage - range_bottom) * 0xFFF / (range_top - range_bottom)) + + # 写入输出值 + reg_addr = self.AOAddr + channel * self.ChannelAddr + + # 使用写多寄存器 (UInt32) + self._dam3000m.DAM3000M_WriteMultiRegsUInt32.argtypes = [ + wintypes.HANDLE, wintypes.LONG, wintypes.LONG, + wintypes.LONG, ctypes.POINTER(wintypes.ULONG) + ] + self._dam3000m.DAM3000M_WriteMultiRegsUInt32.restype = wintypes.BOOL + + if not self._dam3000m.DAM3000M_WriteMultiRegsUInt32( + self.handle, self.device_id, reg_addr, 1, + ctypes.byref(ctypes.c_ulong(dal_lsb)) + ): + raise RuntimeError(f"设置模拟输出失败: 通道 {channel}, 电压 {voltage}V") + + def get_range_mode(self, channel: int) -> Tuple[float, float]: + """ + 获取通道的当前量程范围 + + Args: + channel: 通道号 (0-3) + + Returns: + (最小电压, 最大电压) 元组 + """ + if channel not in self.channel_modes: + raise ValueError(f"通道 {channel} 尚未设置量程模式") + + return self.RangeModes[self.channel_modes[channel]] + + def get_all_channels_range(self) -> Dict[int, Tuple[float, float]]: + """获取所有通道的量程范围""" + return { + channel: self.RangeModes[mode] + for channel, mode in self.channel_modes.items() + } \ No newline at end of file diff --git a/unilabos/devices/altai_dam/dam3151.py b/unilabos/devices/altai_dam/dam3151.py new file mode 100644 index 000000000..360d17be4 --- /dev/null +++ b/unilabos/devices/altai_dam/dam3151.py @@ -0,0 +1,228 @@ +""" +DAM3151 - 32通道模拟输入模块 + +测量范围: +电压: +- -10V ~ 10V (range_code=9) +- -5V ~ 5V (range_code=8) +- -1V ~ 1V (range_code=6) +- -500mV ~ 500mV (range_code=5) +- -150mV ~ 150mV (range_code=4) +- 0V ~ 10V (range_code=14) +- 0V ~ 5V (range_code=13) +- 1V ~ 5V (range_code=130) + +电流: +- -20mA ~ 20mA (range_code=10) +- 0mA ~ 20mA (range_code=11) +- 4mA ~ 20mA (range_code=12) +- 0mA ~ 22mA (range_code=128) +""" + +from typing import Tuple, Dict, List +import ctypes +from ctypes import wintypes + +from unilabos.devices.altai_dam.dam_base import DAMDeviceBase + + +class DAM3151(DAMDeviceBase): + """ + DAM3151 - 32通道模拟输入模块 + + 支持32通道同步采集,可测量电压或电流信号 + """ + + # 设备参数 + ChannelNum = 32 # 通道数 + RangeNum = 12 # 量程模式数 + RangeAddr = 136 # 量程设置寄存器起始地址 (137-1) + CHEnableAddr = 221 # 通道使能寄存器地址 + ADAddr = 0 # A/D转换数据起始地址 + fLsbType = 65535.0 # LSB转换系数 + + # 量程模式映射 (range_code: (最小值, 最大值, 单位)) + RangeModes: Dict[int, Tuple[float, float, str]] = { + 9: (-10.0, 10.0, "V"), # -10V to 10V + 8: (-5.0, 5.0, "V"), # -5V to 5V + 6: (-1.0, 1.0, "V"), # -1V to 1V + 5: (-0.5, 0.5, "V"), # -500mV to 500mV + 4: (-0.15, 0.15, "V"), # -150mV to 150mV + 14: (0.0, 10.0, "V"), # 0V to 10V + 13: (0.0, 5.0, "V"), # 0V to 5V + 130: (1.0, 5.0, "V"), # 1V to 5V + 10: (-20.0, 20.0, "mA"), # -20mA to 20mA + 11: (0.0, 20.0, "mA"), # 0mA to 20mA + 12: (4.0, 20.0, "mA"), # 4mA to 20mA + 128: (0.0, 22.0, "mA") # 0mA to 22mA + } + + # 默认量程模式 (交替使用电流和电压模式) + DefaultModeList = [10 if i % 2 == 0 else 8 for i in range(ChannelNum)] + + def __init__(self, com_id: int, baud_rate: int, device_id: int, + mode_list: list = None, dll_path: str = None): + """ + 初始化DAM3151模块 + + Args: + com_id: 串口号 + baud_rate: 波特率代码 + device_id: 设备ID + mode_list: 各通道量程模式列表 (长度为32,元素为range_code) + dll_path: DLL文件路径 + """ + super().__init__(com_id, baud_rate, device_id, dll_path) + + # 初始化通道量程模式 + self.channel_modes = {} + mode_list = mode_list or self.DefaultModeList + + # 使能所有通道 + self._enable_all_channels() + + # 设置各通道量程 + for channel in range(self.ChannelNum): + self.set_measurement_range_mode(channel, mode_list[channel]) + + def _enable_all_channels(self): + """使能所有32个通道""" + mask = (1 << self.ChannelNum) - 1 # 0xFFFFFFFF + if not self.write_single_reg(self.CHEnableAddr, mask): + raise RuntimeError( + f"使能通道失败: handle={self.handle}, device_id={self.device_id}, " + f"addr={self.CHEnableAddr}" + ) + + def set_measurement_range_mode(self, channel: int, range_code: int): + """ + 设置通道测量范围模式 + + Args: + channel: 通道号 (0-31) + range_code: 量程代码 + + Raises: + ValueError: 参数错误 + RuntimeError: 设置失败 + """ + if channel not in range(self.ChannelNum): + raise ValueError(f"通道号必须在 0-{self.ChannelNum-1} 之间") + + if range_code not in self.RangeModes: + raise ValueError(f"量程代码必须是 {list(self.RangeModes.keys())} 之一") + + reg_addr = self.RangeAddr + channel + if not self.write_single_reg(reg_addr, range_code): + raise RuntimeError( + f"设置量程失败: handle={self.handle}, device_id={self.device_id}, " + f"addr={reg_addr}, mode={range_code}" + ) + + self.channel_modes[channel] = range_code + + def _data_converter(self, raw_data: int, range_top: float, range_bottom: float) -> float: + """ + 将原始数据转换为实际测量值 + + Args: + raw_data: 原始A/D转换值 + range_top: 量程上限 + range_bottom: 量程下限 + + Returns: + 实际测量值 (V 或 mA) + """ + return raw_data / self.fLsbType * (range_top - range_bottom) + range_bottom + + def measure_all_channels(self) -> List[float]: + """ + 测量所有32个通道 + + Returns: + 测量值列表 (长度为32,单位为 V 或 mA) + """ + # 读取所有通道数据 + raw_data = self.read_input_regs_uint16(self.ADAddr, self.ChannelNum) + + # 转换为实际值 + measurements = [] + for channel in range(self.ChannelNum): + if channel not in self.channel_modes: + measurements.append(None) + continue + + range_code = self.channel_modes[channel] + range_bottom, range_top, unit = self.RangeModes[range_code] + + value = self._data_converter(raw_data[channel], range_top, range_bottom) + measurements.append(value) + + return measurements + + def measure_channel(self, channel: int) -> float: + """ + 测量单个通道 + + Args: + channel: 通道号 (0-31) + + Returns: + 测量值 (V 或 mA) + """ + if channel not in range(self.ChannelNum): + raise ValueError(f"通道号必须在 0-{self.ChannelNum-1} 之间") + + if channel not in self.channel_modes: + raise ValueError(f"通道 {channel} 尚未设置量程模式") + + # 读取单个通道数据 + raw_data = self.read_input_regs_uint16(self.ADAddr + channel, 1)[0] + + # 转换为实际值 + range_code = self.channel_modes[channel] + range_bottom, range_top, unit = self.RangeModes[range_code] + + return self._data_converter(raw_data, range_top, range_bottom) + + def get_range_mode(self, channel: int) -> Tuple[float, float, str]: + """ + 获取通道的当前量程范围 + + Args: + channel: 通道号 (0-31) + + Returns: + (最小值, 最大值, 单位) 元组 + """ + if channel not in self.channel_modes: + raise ValueError(f"通道 {channel} 尚未设置量程模式") + + return self.RangeModes[self.channel_modes[channel]] + + def get_all_channels_range(self) -> Dict[int, Tuple[float, float, str]]: + """获取所有通道的量程范围""" + return { + channel: self.RangeModes[mode] + for channel, mode in self.channel_modes.items() + } + + def measure_current_channels(self) -> List[float]: + """ + 测量所有电流模式通道 (偶数通道) + + Returns: + 电流值列表 (mA) + """ + all_data = self.measure_all_channels() + return [all_data[i] for i in range(0, self.ChannelNum, 2)] + + def measure_voltage_channels(self) -> List[float]: + """ + 测量所有电压模式通道 (奇数通道) + + Returns: + 电压值列表 (V) + """ + all_data = self.measure_all_channels() + return [all_data[i] for i in range(1, self.ChannelNum, 2)] \ No newline at end of file diff --git a/unilabos/devices/altai_dam/dam_base.py b/unilabos/devices/altai_dam/dam_base.py new file mode 100644 index 000000000..35d5958e8 --- /dev/null +++ b/unilabos/devices/altai_dam/dam_base.py @@ -0,0 +1,163 @@ +""" +阿尔泰科技DAM3000M系列设备基类 +""" + +import ctypes +from ctypes import wintypes +from typing import Dict, Optional +import platform + + +class DeviceInfo(ctypes.Structure): + """设备信息结构体""" + _fields_ = [ + ("DeviceType", wintypes.LONG), + ("TypeSuffix", wintypes.LONG), + ("ModusType", wintypes.LONG), + ("VesionID", wintypes.LONG), + ("DeviceID", wintypes.LONG), + ("BaudRate", wintypes.LONG), + ("bParity", wintypes.LONG) + ] + + +class DAMDeviceBase: + """ + 阿尔泰DAM3000M系列设备基类 + + 提供基本的设备连接、初始化和通信功能 + """ + + # 类级别的句柄缓存,支持同一串口共享句柄 + _handles: Dict[int, wintypes.HANDLE] = {} + + def __init__(self, com_id: int, baud_rate: int, device_id: int, dll_path: Optional[str] = None): + """ + 初始化DAM设备 + + Args: + com_id: 串口号 (例如: 4 表示 COM4) + baud_rate: 波特率代码 (0-7) + 0: 1200 bps + 1: 2400 bps + 2: 4800 bps + 3: 9600 bps + 4: 19200 bps + 5: 38400 bps + 6: 57600 bps + 7: 115200 bps + device_id: 设备ID (Modbus地址) + dll_path: DLL文件路径 (可选,默认为当前目录下的DAM3000M_64.dll) + """ + if platform.system() != "Windows": + raise RuntimeError("阿尔泰DAM3000M设备仅支持Windows系统") + + self.com_id = com_id + self.baud_rate = baud_rate + self.device_id = device_id + self.dll_path = dll_path or "./DAM3000M_64.dll" + + # 加载DLL + self._dam3000m = ctypes.WinDLL(self.dll_path) + self._setup_dll_functions() + + # 获取设备句柄 + self.handle = self._get_handle() + + # 获取设备信息 + self.device_info = self._get_device_info(device_id) + + def _setup_dll_functions(self): + """设置DLL函数原型""" + # 创建设备 + self._dam3000m.DAM3000M_CreateDevice.argtypes = [wintypes.LONG] + self._dam3000m.DAM3000M_CreateDevice.restype = wintypes.HANDLE + + # 初始化设备 + self._dam3000m.DAM3000M_InitDevice.argtypes = [ + wintypes.HANDLE, wintypes.LONG, wintypes.LONG, + wintypes.LONG, wintypes.LONG, wintypes.LONG, wintypes.LONG + ] + self._dam3000m.DAM3000M_InitDevice.restype = wintypes.BOOL + + # 释放设备 + self._dam3000m.DAM3000M_ReleaseDevice.argtypes = [wintypes.HANDLE] + self._dam3000m.DAM3000M_ReleaseDevice.restype = wintypes.BOOL + + # 获取设备信息 + self._dam3000m.DAM3000M_GetDeviceInfo.argtypes = [ + wintypes.HANDLE, wintypes.LONG, ctypes.POINTER(DeviceInfo) + ] + self._dam3000m.DAM3000M_GetDeviceInfo.restype = wintypes.BOOL + + # 写单个寄存器 + self._dam3000m.DAM3000M_WriteSingleReg.argtypes = [ + wintypes.HANDLE, wintypes.LONG, wintypes.LONG, wintypes.ULONG + ] + self._dam3000m.DAM3000M_WriteSingleReg.restype = wintypes.BOOL + + # 读输入寄存器 (UInt16) + self._dam3000m.DAM3000M_ReadInputRegsUInt16.argtypes = [ + wintypes.HANDLE, wintypes.LONG, wintypes.INT, + wintypes.INT, ctypes.POINTER(wintypes.USHORT) + ] + self._dam3000m.DAM3000M_ReadInputRegsUInt16.restype = wintypes.BOOL + + def _get_handle(self) -> wintypes.HANDLE: + """获取设备句柄(支持句柄共享)""" + if self.com_id not in self._handles: + handle = self._dam3000m.DAM3000M_CreateDevice(self.com_id) + if handle in (-1, None, 0): + raise RuntimeError(f"创建设备失败: COM{self.com_id}") + + # 初始化设备: baud_rate, 8位数据位, 无校验, 1位停止位 + if not self._dam3000m.DAM3000M_InitDevice(handle, self.baud_rate, 8, 0, 0x00, 200, 0): + raise RuntimeError(f"初始化设备失败: COM{self.com_id}") + + self._handles[self.com_id] = handle + + return self._handles[self.com_id] + + def _get_device_info(self, device_id: int) -> DeviceInfo: + """获取设备信息""" + device_info = DeviceInfo() + if not self._dam3000m.DAM3000M_GetDeviceInfo(self.handle, device_id, ctypes.byref(device_info)): + raise RuntimeError(f"获取设备信息失败: 设备ID {device_id}") + return device_info + + @property + def device_name(self) -> str: + """获取设备名称""" + name = f"DAM-{self.device_info.DeviceType:02X}{chr(self.device_info.TypeSuffix >> 8 & 0xFF)}" + return name.rstrip(' ') + + def write_single_reg(self, reg_addr: int, value: int) -> bool: + """写单个寄存器""" + return self._dam3000m.DAM3000M_WriteSingleReg( + self.handle, self.device_id, reg_addr, value + ) + + def read_input_regs_uint16(self, start_addr: int, count: int) -> list: + """读输入寄存器 (16位无符号)""" + buffer_type = ctypes.c_ushort * count + data_buffer = buffer_type() + + if not self._dam3000m.DAM3000M_ReadInputRegsUInt16( + self.handle, self.device_id, start_addr, count, data_buffer + ): + raise RuntimeError(f"读输入寄存器失败: 起始地址 {start_addr}, 数量 {count}") + + return list(data_buffer) + + def close(self): + """关闭设备连接""" + if self.com_id in self._handles: + self._dam3000m.DAM3000M_ReleaseDevice(self._handles[self.com_id]) + del self._handles[self.com_id] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False \ No newline at end of file diff --git a/unilabos/i18n/__init__.py b/unilabos/i18n/__init__.py new file mode 100644 index 000000000..80f877c11 --- /dev/null +++ b/unilabos/i18n/__init__.py @@ -0,0 +1,190 @@ +""" +Uni-Lab-OS i18n 国际化支持模块 + +提供多语言支持,支持英文和中文。 +使用方式: + from unilabos.i18n import _, ngettext + + # 简单翻译 + print(_("Hello World")) + + # 复数形式 + print(ngettext("One item", "{count} items", count).format(count=count)) +""" + +import gettext +import os +import re +from typing import Optional, Dict + +# 当前语言设置 +_current_language: str = "en_US" +_translation: Optional[gettext.GNUTranslations] = None +_po_translations: Dict[str, str] = {} + +# 支持的语言 +SUPPORTED_LANGUAGES = { + "en_US": "English", + "zh_CN": "中文", +} + +DEFAULT_LANGUAGE = "en_US" + + +def get_locale_dir() -> str: + """获取本地化文件目录""" + return os.path.join(os.path.dirname(os.path.abspath(__file__)), "locales") + + +def _load_po_file(lang_code: str) -> Dict[str, str]: + """ + 从 PO 文件加载翻译 + + Args: + lang_code: 语言代码 + + Returns: + 翻译字典 {msgid: msgstr} + """ + translations = {} + po_path = os.path.join(get_locale_dir(), lang_code, "LC_MESSAGES", "messages.po") + + if not os.path.exists(po_path): + return translations + + try: + with open(po_path, 'r', encoding='utf-8') as f: + content = f.read() + + # 使用正则表达式解析 PO 文件 + # 匹配 msgid "..." 和 msgstr "..." 对 + pattern = r'msgid\s+"([^"]*)"\s+msgstr\s+"([^"]*)"' + matches = re.findall(pattern, content, re.DOTALL) + + for msgid, msgstr in matches: + # 处理多行字符串(去掉换行符) + msgid = msgid.replace('"\n"', '') + msgstr = msgstr.replace('"\n"', '') + if msgstr: # 只保存有翻译的 + translations[msgid] = msgstr + + except Exception as e: + print(f"Warning: Failed to load PO file: {e}") + + return translations + + +def set_language(lang_code: str) -> bool: + """ + 设置当前语言 + + Args: + lang_code: 语言代码,如 "en_US", "zh_CN" + + Returns: + 是否设置成功 + """ + global _current_language, _translation, _po_translations + + if lang_code not in SUPPORTED_LANGUAGES: + return False + + _current_language = lang_code + _po_translations = {} + + if lang_code == DEFAULT_LANGUAGE: + _translation = None + return True + + locale_dir = get_locale_dir() + + # 首先尝试加载 MO 文件 + mo_path = os.path.join(locale_dir, lang_code, "LC_MESSAGES", "messages.mo") + if os.path.exists(mo_path): + try: + with open(mo_path, 'rb') as f: + _translation = gettext.GNUTranslations(f) + return True + except Exception: + pass + + # 如果 MO 文件不存在或加载失败,加载 PO 文件 + _po_translations = _load_po_file(lang_code) + _translation = None # 使用 PO 文件翻译 + + return True + + +def get_current_language() -> str: + """获取当前语言代码""" + return _current_language + + +def _(message: str) -> str: + """ + 翻译字符串 + + Args: + message: 要翻译的字符串 + + Returns: + 翻译后的字符串 + """ + global _translation, _po_translations + + if _translation is not None: + return _translation.gettext(message) + elif _po_translations: + return _po_translations.get(message, message) + return message + + +def ngettext(singular: str, plural: str, n: int) -> str: + """ + 翻译复数形式的字符串 + + Args: + singular: 单数形式 + plural: 复数形式 + n: 数量 + + Returns: + 根据数量选择合适的翻译 + """ + global _translation, _po_translations + + if _translation is not None: + return _translation.ngettext(singular, plural, n) + elif _po_translations: + # 中文没有复数变化,英文按数量判断 + if _current_language == "zh_CN": + return _po_translations.get(plural, plural) + else: + return _po_translations.get(singular if n == 1 else plural, + singular if n == 1 else plural) + return singular if n == 1 else plural + + +# 便捷别名 +gettext = _ +N = ngettext + + +# 初始化:尝试从环境变量读取语言设置 +def _init_language(): + """初始化语言设置""" + import os + env_lang = os.environ.get("UNILABOS_LANGUAGE", "") + if env_lang in SUPPORTED_LANGUAGES: + set_language(env_lang) + else: + # 尝试从系统环境变量推断 + sys_lang = os.environ.get("LANG", "") + if "zh" in sys_lang.lower(): + set_language("zh_CN") + else: + set_language(DEFAULT_LANGUAGE) + + +# 模块导入时初始化 +_init_language() diff --git a/unilabos/i18n/compile_translations.py b/unilabos/i18n/compile_translations.py new file mode 100644 index 000000000..6cdb6350f --- /dev/null +++ b/unilabos/i18n/compile_translations.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +""" +编译 PO 文件为 MO 文件的脚本 + +使用方法: + python compile_translations.py +""" + +import os +import subprocess +import sys + + +def compile_po_files(): + """编译所有 PO 文件为 MO 文件""" + locale_dir = os.path.join(os.path.dirname(__file__), "locales") + + for lang in ["en_US", "zh_CN"]: + po_path = os.path.join(locale_dir, lang, "LC_MESSAGES", "messages.po") + mo_path = os.path.join(locale_dir, lang, "LC_MESSAGES", "messages.mo") + + if not os.path.exists(po_path): + print(f"Warning: PO file not found: {po_path}") + continue + + # 尝试使用 msgfmt + try: + subprocess.run( + ["msgfmt", "-o", mo_path, po_path], + check=True, + capture_output=True + ) + print(f"✓ Compiled {lang}/messages.po -> messages.mo") + except (subprocess.CalledProcessError, FileNotFoundError): + # msgfmt 不可用,使用纯 Python 实现 + try: + compile_po_to_mo_python(po_path, mo_path) + print(f"✓ Compiled {lang}/messages.po -> messages.mo (using Python)") + except Exception as e: + print(f"✗ Failed to compile {lang}: {e}") + + +def compile_po_to_mo_python(po_path: str, mo_path: str): + """ + 使用纯 Python 将 PO 文件编译为 MO 文件 + + MO 文件格式参考: https://www.gnu.org/software/gettext/manual/html_node/MO-Files.html + """ + import struct + import re + + # 解析 PO 文件 + translations = [] + + with open(po_path, 'r', encoding='utf-8') as f: + content = f.read() + + # 提取 msgid/msgstr 对 + # 支持多行字符串 + pattern = r'msgid\s+((?:"[^"]*"\s*)+)\s+msgstr\s+((?:"[^"]*"\s*)+)' + matches = re.findall(pattern, content) + + for msgid_part, msgstr_part in matches: + # 合并多行字符串 + msgid = ''.join(re.findall(r'"([^"]*)"', msgid_part)) + msgstr = ''.join(re.findall(r'"([^"]*)"', msgstr_part)) + + if msgid and msgstr: # 只保存有翻译的 + translations.append((msgid, msgstr)) + + if not translations: + print(f"Warning: No translations found in {po_path}") + return + + # 构建 MO 文件 + # MO 文件头格式: + # - magic number: 0x950412de + # - version: 0 + # - n: 字符串数量 + # - o: 原文偏移 + # - t: 翻译偏移 + # - hash_table_size: 0 (不使用) + # - hash_table_offset: 0 (不使用) + + n = len(translations) + + # 计算偏移 + header_size = 5 * 4 # 5 个 32 位整数 + table_size = n * 8 # 每个字符串条目 8 字节 (长度 + 偏移) + + original_offset = 28 + table_size * 2 + translation_offset = original_offset + + # 计算每个字符串的位置 + offsets = [] + current_offset = original_offset + + for msgid, msgstr in translations: + offsets.append(current_offset) + current_offset += len(msgid.encode('utf-8')) + 1 # +1 for null terminator + + translation_start = current_offset + + for msgid, msgstr in translations: + offsets.append(current_offset) + current_offset += len(msgstr.encode('utf-8')) + 1 + + # 写入 MO 文件 + with open(mo_path, 'wb') as f: + # 写入头部 + f.write(struct.pack(' args_dict: 命令行参数字典 show_config: 是否显示配置信息 """ + _ = _get_translator() + # 检测终端是否支持ANSI颜色 if platform.system() == "Windows": os.system("") # 启用Windows终端中的ANSI支持 @@ -96,10 +108,10 @@ def print_unilab_banner(args_dict: Dict[str, Any], show_config: bool = True) -> # 显示版本信息 system_info = f""" -{Colors.YELLOW}Version:{Colors.RESET} {Colors.BRIGHT_GREEN}{version}{Colors.RESET} -{Colors.YELLOW}System:{Colors.RESET} {Colors.WHITE}{platform.system()} {platform.release()}{Colors.RESET} -{Colors.YELLOW}Python:{Colors.RESET} {Colors.WHITE}{platform.python_version()}{Colors.RESET} -{Colors.YELLOW}Time:{Colors.RESET} {Colors.WHITE}{current_time}{Colors.RESET} +{Colors.YELLOW}{_("Version:")}{Colors.RESET} {Colors.BRIGHT_GREEN}{version}{Colors.RESET} +{Colors.YELLOW}{_("System:")}{Colors.RESET} {Colors.WHITE}{platform.system()} {platform.release()}{Colors.RESET} +{Colors.YELLOW}{_("Python:")}{Colors.RESET} {Colors.WHITE}{platform.python_version()}{Colors.RESET} +{Colors.YELLOW}{_("Time:")}{Colors.RESET} {Colors.WHITE}{current_time}{Colors.RESET} {Colors.BRIGHT_WHITE}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{Colors.RESET}""" # 打印横幅和系统信息 @@ -117,37 +129,39 @@ def print_config(args_dict: Dict[str, Any]) -> None: Args: args_dict: 命令行参数字典 """ - config_info = f"{Colors.BRIGHT_BLUE}{Colors.BOLD}Configuration:{Colors.RESET}\n" + _ = _get_translator() + + config_info = f"{Colors.BRIGHT_BLUE}{Colors.BOLD}{_('Configuration:')}{Colors.RESET}\n" # 后端信息 if "backend" in args_dict: - config_info += f"{Colors.CYAN}• Backend:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Backend:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{args_dict['backend']}{Colors.RESET}\n" # 桥接信息 if "app_bridges" in args_dict: - config_info += f"{Colors.CYAN}• Bridges:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Bridges:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{', '.join(args_dict['app_bridges'])}{Colors.RESET}\n" # 主机模式 if "without_host" in args_dict: - mode = "Slave" if args_dict["without_host"] else "Master" - config_info += f"{Colors.CYAN}• Host Mode:{Colors.RESET} {Colors.WHITE}{mode}{Colors.RESET}\n" + mode = _("Slave") if args_dict["without_host"] else _("Master") + config_info += f"{Colors.CYAN}• {_('Host Mode:')}{Colors.RESET} {Colors.WHITE}{mode}{Colors.RESET}\n" # 如果有图或设备信息,显示它们 if "graph" in args_dict and args_dict["graph"] is not None: - config_info += f"{Colors.CYAN}• Graph:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Graph:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{args_dict['graph']}{Colors.RESET}\n" elif "devices" in args_dict and args_dict["devices"] is not None: - config_info += f"{Colors.CYAN}• Devices:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Devices:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{args_dict['devices']}{Colors.RESET}\n" if "resources" in args_dict and args_dict["resources"] is not None: - config_info += f"{Colors.CYAN}• Resources:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Resources:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{args_dict['resources']}{Colors.RESET}\n" # 控制器配置 if "controllers" in args_dict and args_dict["controllers"] is not None: - config_info += f"{Colors.CYAN}• Controllers:{Colors.RESET} " + config_info += f"{Colors.CYAN}• {_('Controllers:')}{Colors.RESET} " config_info += f"{Colors.WHITE}{args_dict['controllers']}{Colors.RESET}\n" # 打印结束分隔线 @@ -164,20 +178,22 @@ def print_status(message: str, status_type: str = "info") -> None: message: 要打印的消息 status_type: 状态类型('info', 'success', 'warning', 'error') """ + _ = _get_translator() + color = Colors.WHITE prefix = "" if status_type == "info": color = Colors.BLUE - prefix = "INFO" + prefix = _("INFO") elif status_type == "success": color = Colors.GREEN - prefix = "SUCCESS" + prefix = _("SUCCESS") elif status_type == "warning": color = Colors.YELLOW - prefix = "WARNING" + prefix = _("WARNING") elif status_type == "error": color = Colors.RED - prefix = "ERROR" + prefix = _("ERROR") print(f"{color}[{prefix}]{Colors.RESET} {message}")