mirror of
https://github.com/DragonOS-Community/DragonOS.git
synced 2025-06-18 16:26:31 +00:00
doc: Add ai doc translate tool and add English doc. (#1168)
- add tools/doc_translator.py - translated docs into English Signed-off-by: longjin <longjin@DragonOS.org>
This commit is contained in:
467
tools/doc_translator.py
Normal file
467
tools/doc_translator.py
Normal file
@ -0,0 +1,467 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
DragonOS文档自动翻译工具
|
||||
Usage:
|
||||
在DragonOS源码根目录下运行此脚本。
|
||||
|
||||
需要先进入docs目录,执行命令安装依赖包。
|
||||
pip install -r requirements.txt
|
||||
|
||||
接着先声明以下变量:
|
||||
export OPENAI_API_KEY=your_api_key
|
||||
export OPENAI_MODEL=your_model_name (推荐qwen3的4b以上的)
|
||||
export OPENAI_BASE_URL=your_openai_base_url
|
||||
export MAX_WORKERS=your_max_workers (推荐2-20)
|
||||
|
||||
然后运行:
|
||||
python3 tools/doc_translator.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
import threading
|
||||
from typing import List, Dict, Tuple
|
||||
import openai
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
# 配置
|
||||
|
||||
|
||||
def get_env_var(name, required=False, default=None):
|
||||
"""从环境变量获取配置"""
|
||||
value = os.getenv(name, default)
|
||||
if required and not value:
|
||||
raise ValueError(f"环境变量 {name} 未设置")
|
||||
return value
|
||||
|
||||
|
||||
CONFIG = {
|
||||
"source_dir": "docs", # 源文档目录
|
||||
"target_languages": {
|
||||
"en": "English",
|
||||
},
|
||||
"dirs_exclude": ["_build", "locales"], # 排除的目录
|
||||
"model": get_env_var("OPENAI_MODEL", default="qwen3:4b"), # 模型名称
|
||||
# API地址
|
||||
"base_url": get_env_var("OPENAI_BASE_URL", default="http://localhost:11434/v1"),
|
||||
"chunk_size": 1000, # 分块大小(tokens)
|
||||
"cache_file": "docs/.translation_cache.json", # 翻译缓存文件
|
||||
"max_workers": int(get_env_var("MAX_WORKERS", default="1")), # 并行工作数
|
||||
# 元数据模板
|
||||
"meta_templates": {
|
||||
".rst": (
|
||||
".. note:: AI Translation Notice\n\n"
|
||||
" This document was automatically translated by `{model}` model, for reference only.\n\n"
|
||||
" - Source document: {original_path}\n\n"
|
||||
" - Translation time: {timestamp}\n\n"
|
||||
" - Translation model: `{model}`\n\n"
|
||||
"\n Please report issues via `Community Channel <https://github.com/DragonOS-Community/DragonOS/issues>`_\n\n"
|
||||
),
|
||||
".md": (
|
||||
":::{note}\n"
|
||||
"**AI Translation Notice**\n\n"
|
||||
"This document was automatically translated by `{model}` model, for reference only.\n\n"
|
||||
"- Source document: {original_path}\n\n"
|
||||
"- Translation time: {timestamp}\n\n"
|
||||
"- Translation model: `{model}`\n\n"
|
||||
"Please report issues via [Community Channel](https://github.com/DragonOS-Community/DragonOS/issues)\n\n"
|
||||
":::\n\n"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class LabelManager:
|
||||
"""管理文档标签和引用"""
|
||||
|
||||
def __init__(self, lang: str):
|
||||
self.label_map = {}
|
||||
self.prefix = "_translated_label_"
|
||||
self.lang = lang
|
||||
|
||||
def register_label(self, original_label: str) -> str:
|
||||
"""注册新标签并返回映射后的标签"""
|
||||
if original_label not in self.label_map:
|
||||
new_label = f"{self.prefix}_{original_label}_{self.lang}"
|
||||
self.label_map[original_label] = new_label
|
||||
return self.label_map[original_label]
|
||||
|
||||
def get_all_labels(self) -> Dict[str, str]:
|
||||
"""获取所有标签映射"""
|
||||
return self.label_map
|
||||
|
||||
|
||||
class DocumentTranslator:
|
||||
def __init__(self):
|
||||
self._cache_lock = threading.Lock()
|
||||
self._cache = self._load_cache()
|
||||
self.fail_count = 0
|
||||
try:
|
||||
self.client = openai.OpenAI(
|
||||
base_url=CONFIG["base_url"],
|
||||
# 这是故意把key的获取写在这里的。防止哪个二货直接print CONFIG导致key泄露。
|
||||
api_key=get_env_var("OPENAI_API_KEY", default="ollama"),
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"OpenAI客户端初始化失败: {str(e)}")
|
||||
|
||||
def _load_cache(self) -> Dict:
|
||||
"""加载翻译缓存"""
|
||||
if os.path.exists(CONFIG["cache_file"]):
|
||||
with open(CONFIG["cache_file"], "r", encoding="utf-8") as f:
|
||||
try:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return {}
|
||||
|
||||
def _save_cache(self):
|
||||
"""保存翻译缓存"""
|
||||
with self._cache_lock:
|
||||
with open(CONFIG["cache_file"], "w", encoding="utf-8") as f:
|
||||
json.dump(self._cache, f, ensure_ascii=False, indent=2)
|
||||
|
||||
def _get_cache_key(self, filepath: str, lang: str) -> str:
|
||||
"""生成缓存键(包含语言代码)"""
|
||||
rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
|
||||
return f"{lang}:{rel_path}"
|
||||
|
||||
def _split_into_chunks(self, text: str) -> List[str]:
|
||||
"""将文本分块"""
|
||||
# 按段落分割
|
||||
paragraphs = re.split(r"\n\s*\n", text)
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
for para in paragraphs:
|
||||
para_size = len(para.split())
|
||||
if current_size + para_size > CONFIG["chunk_size"] and current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
current_chunk.append(para)
|
||||
current_size += para_size
|
||||
|
||||
if current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
|
||||
return chunks
|
||||
|
||||
def _process_rst_labels(self, text: str, label_manager: LabelManager) -> str:
|
||||
"""处理reStructuredText标签"""
|
||||
def replace_label(match):
|
||||
original_label = match.group(1)
|
||||
new_label = label_manager.register_label(original_label)
|
||||
return f'.. {new_label}:'
|
||||
|
||||
# 处理标签定义
|
||||
text = re.sub(r'\.\.\s+_([^:]+):', replace_label, text)
|
||||
# 处理标签引用
|
||||
text = re.sub(r'(?<!\w)`([^`]+)`(?!\w)',
|
||||
lambda m: f'`{label_manager.register_label(m.group(1))}`',
|
||||
text)
|
||||
return text
|
||||
|
||||
def _process_md_labels(self, text: str, label_manager: LabelManager) -> str:
|
||||
"""处理Markdown标签"""
|
||||
# 处理显式标签定义 {#label}
|
||||
text = re.sub(r'\{#([^}]+)\}',
|
||||
lambda m: f'{{#{label_manager.register_label(m.group(1))}}}',
|
||||
text)
|
||||
|
||||
# 处理显式标签定义 (label)=
|
||||
text = re.sub(r'\(([^)]+)\)=',
|
||||
lambda m: f'({label_manager.register_label(m.group(1))})=',
|
||||
text)
|
||||
|
||||
# 处理标签引用 [text](#label)
|
||||
text = re.sub(r'\[([^\]]+)\]\(#([^)]+)\)',
|
||||
lambda m: f'[{m.group(1)}](#{label_manager.register_label(m.group(2))})',
|
||||
text)
|
||||
|
||||
# 处理裸标签引用 #label
|
||||
text = re.sub(r'(?<!\w)#([\w-]+)(?!\w)',
|
||||
lambda m: f'#{label_manager.register_label(m.group(1))}',
|
||||
text)
|
||||
|
||||
return text
|
||||
|
||||
def _generate_unique_label_for_lang(self, text: str, lang: str) -> str:
|
||||
# 处理标签
|
||||
|
||||
label_manager = LabelManager(lang)
|
||||
text = self._process_rst_labels(text, label_manager)
|
||||
text = self._process_md_labels(text, label_manager)
|
||||
return text
|
||||
|
||||
def _preserve_special_format(self, text: str) -> Tuple[str, Dict]:
|
||||
"""保留特殊格式"""
|
||||
preserved = {}
|
||||
|
||||
# 排除不需要翻译的块
|
||||
exclude_blocks = re.findall(
|
||||
r'\.\. Note: __EXCLUDE_IN_TRANSLATED_START.*?\.\. Note: __EXCLUDE_IN_TRANSLATED_END',
|
||||
text, re.DOTALL)
|
||||
for block in exclude_blocks:
|
||||
text = text.replace(block, '')
|
||||
|
||||
# 处理多行代码块
|
||||
code_blocks = re.findall(r"```.*?\n.*?```", text, re.DOTALL)
|
||||
for i, block in enumerate(code_blocks):
|
||||
placeholder = f"__CODE_BLOCK_{i}__"
|
||||
preserved[placeholder] = block
|
||||
text = text.replace(block, placeholder)
|
||||
# 处理内联代码块
|
||||
inline_code = re.findall(r"`[^`]+`", text)
|
||||
for i, code in enumerate(inline_code):
|
||||
placeholder = f"__INLINE_CODE_{i}__"
|
||||
preserved[placeholder] = code
|
||||
text = text.replace(code, placeholder)
|
||||
|
||||
return text, preserved
|
||||
|
||||
def _restore_special_format(self, text: str, preserved: Dict) -> str:
|
||||
"""恢复特殊格式"""
|
||||
# 先恢复内联代码块
|
||||
for placeholder, content in preserved.items():
|
||||
if placeholder.startswith("__INLINE_CODE_"):
|
||||
text = text.replace(placeholder, content)
|
||||
|
||||
# 然后恢复多行代码块
|
||||
for placeholder, content in preserved.items():
|
||||
if placeholder.startswith("__CODE_BLOCK_"):
|
||||
text = text.replace(placeholder, content)
|
||||
|
||||
return text
|
||||
|
||||
def _remove_thinking(self, text: str) -> str:
|
||||
"""Remove <think> tags from text"""
|
||||
return re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
|
||||
|
||||
def _translate_chunk(self, args: Tuple[str, str]) -> str:
|
||||
"""翻译单个文本块(内部方法,用于并行处理)"""
|
||||
chunk, lang = args
|
||||
retry = 3
|
||||
while retry > 0:
|
||||
try:
|
||||
lang_name = CONFIG["target_languages"].get(lang, "English")
|
||||
prompt = f"你是一个专业的文档翻译助手,请将以下中文技术文档准确翻译成{lang_name},保持技术术语的正确性和格式不变。"
|
||||
|
||||
# disable qwen3's thinking mode
|
||||
if "qwen3" in CONFIG["model"].lower():
|
||||
prompt += "\n/no_think\n"
|
||||
chunk += "\n/no_think\n"
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
extra_body={"enable_thinking": False},
|
||||
model=CONFIG["model"],
|
||||
messages=[
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": chunk}
|
||||
],
|
||||
temperature=0.3,
|
||||
)
|
||||
content = response.choices[0].message.content
|
||||
return self._remove_thinking(content)
|
||||
except Exception as e:
|
||||
retry -= 1
|
||||
if retry == 0:
|
||||
print("翻译失败: {e},放弃重试。")
|
||||
return None
|
||||
|
||||
print(f"翻译出错: {e}, retrying... ({retry})")
|
||||
time.sleep(2)
|
||||
|
||||
def translate_text(self, text: str, lang: str) -> str:
|
||||
"""使用openai接口翻译文本
|
||||
Args:
|
||||
text: 要翻译的文本
|
||||
lang: 目标语言代码
|
||||
"""
|
||||
chunks = self._split_into_chunks(text)
|
||||
translated_chunks = []
|
||||
|
||||
for chunk in chunks:
|
||||
translated_chunk = self._translate_chunk((chunk, lang))
|
||||
if translated_chunk:
|
||||
translated_chunks.append(translated_chunk)
|
||||
|
||||
return "\n\n".join(translated_chunks)
|
||||
|
||||
def process_file(self, filepath: str, lang: str = "en"):
|
||||
"""处理单个文件
|
||||
Args:
|
||||
filepath: 源文件路径
|
||||
lang: 目标语言代码 (默认'en')
|
||||
"""
|
||||
rel_path = os.path.relpath(filepath, CONFIG["source_dir"])
|
||||
target_path = os.path.join(
|
||||
CONFIG["source_dir"], "locales", lang, rel_path)
|
||||
|
||||
# 检查文件是否已存在且未修改
|
||||
cache_key = self._get_cache_key(filepath, lang)
|
||||
file_hash = hashlib.md5(open(filepath, "rb").read()).hexdigest()
|
||||
target_file_exists = os.path.exists(target_path)
|
||||
with self._cache_lock:
|
||||
if cache_key in self._cache and self._cache[cache_key]["hash"] == file_hash and target_file_exists:
|
||||
print(f"文件未修改,跳过: {rel_path} (语言: {lang})")
|
||||
return
|
||||
|
||||
print(f"正在处理: {rel_path}")
|
||||
|
||||
# 读取文件内容
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
# 保留特殊格式
|
||||
content, preserved = self._preserve_special_format(content)
|
||||
|
||||
content = self._generate_unique_label_for_lang(content, lang)
|
||||
|
||||
# 分块翻译
|
||||
translated_content = self.translate_text(
|
||||
content, lang)
|
||||
if not translated_content:
|
||||
print(f"翻译失败!{filepath}")
|
||||
self.fail_count += 1
|
||||
return
|
||||
|
||||
# 恢复特殊格式
|
||||
translated_content = self._restore_special_format(
|
||||
translated_content, preserved)
|
||||
|
||||
# 创建目标目录
|
||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||
|
||||
# 写入翻译结果
|
||||
with open(target_path, "w", encoding="utf-8") as f:
|
||||
# 添加翻译元数据
|
||||
file_ext = os.path.splitext(filepath)[1]
|
||||
template = CONFIG["meta_templates"].get(file_ext, "")
|
||||
|
||||
if template:
|
||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
original_path = os.path.relpath(filepath, CONFIG["source_dir"])
|
||||
meta_content = template.format(
|
||||
note="{note}",
|
||||
model=CONFIG["model"],
|
||||
timestamp=timestamp,
|
||||
original_name=os.path.basename(filepath),
|
||||
original_path=original_path
|
||||
)
|
||||
translated_content = meta_content + translated_content
|
||||
|
||||
f.write(translated_content)
|
||||
f.write("\n")
|
||||
|
||||
# 更新缓存
|
||||
with self._cache_lock:
|
||||
self._cache[cache_key] = {
|
||||
"hash": file_hash,
|
||||
}
|
||||
|
||||
self._save_cache()
|
||||
|
||||
print(f"文件 {rel_path} 已成功翻译为 {lang} 并保存到 {target_path}")
|
||||
|
||||
def add_language_root_title(self, lang):
|
||||
"""为每个语言的文档添加标题"""
|
||||
lang_root_doc_path = os.path.join(
|
||||
CONFIG["source_dir"], "locales", lang, "index.rst")
|
||||
|
||||
if not os.path.exists(lang_root_doc_path):
|
||||
raise FileNotFoundError(f"未找到 {lang} 的标题文件: {lang_root_doc_path}")
|
||||
print(f"正在为 {CONFIG['target_languages'][lang]} 添加标题...")
|
||||
|
||||
# Read existing content first
|
||||
with open(lang_root_doc_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
lang_v = CONFIG["target_languages"][lang]
|
||||
if content.startswith(lang_v):
|
||||
print(f"{lang_v} 的标题已存在,跳过...")
|
||||
return
|
||||
|
||||
# Then write new content (this clears the file)
|
||||
with open(lang_root_doc_path, "w", encoding="utf-8") as f:
|
||||
f.write(
|
||||
f"{lang_v}\n==========================================\n{content}")
|
||||
|
||||
print(f"标题已添加到 {lang_root_doc_path}")
|
||||
|
||||
def run(self):
|
||||
"""运行翻译流程"""
|
||||
|
||||
print("Collecting all files...")
|
||||
all_files = []
|
||||
for root, dirs, files in os.walk(CONFIG["source_dir"], topdown=True):
|
||||
# 只在根目录应用排除逻辑
|
||||
if root == CONFIG["source_dir"]:
|
||||
dirs[:] = [d for d in dirs if d not in CONFIG["dirs_exclude"]]
|
||||
for file in files:
|
||||
if file.endswith((".rst", ".md")):
|
||||
all_files.append(os.path.join(root, file))
|
||||
total_files = len(all_files)
|
||||
print(
|
||||
f"Total {total_files} files to translate in {len(CONFIG['target_languages'])} languages.")
|
||||
total_tasks = total_files * len(CONFIG["target_languages"])
|
||||
|
||||
# 外层进度条:语言
|
||||
lang_pbar = tqdm(CONFIG["target_languages"].items(),
|
||||
desc="Overall progress",
|
||||
unit="lang",
|
||||
position=0)
|
||||
|
||||
for lang_k, lang_v in lang_pbar:
|
||||
lang_pbar.set_description(f"Translating to {lang_v}")
|
||||
|
||||
# 并行处理文件
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
# 包装处理函数便于调试之类的
|
||||
|
||||
def process_file_wrapper(file_path):
|
||||
self.process_file(file_path, lang_k)
|
||||
return file_path
|
||||
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=CONFIG["max_workers"]) as executor:
|
||||
# 提交所有文件处理任务
|
||||
futures = [executor.submit(
|
||||
process_file_wrapper, path) for path in all_files]
|
||||
|
||||
# 创建进度条
|
||||
file_pbar = tqdm(total=len(all_files),
|
||||
desc=f"Files in {lang_v}",
|
||||
unit="file",
|
||||
position=1,
|
||||
leave=False)
|
||||
|
||||
# 更新进度条
|
||||
for future in as_completed(futures):
|
||||
file_pbar.update(1)
|
||||
future.result() # 获取结果(如果有异常会在这里抛出)
|
||||
|
||||
file_pbar.close()
|
||||
self.add_language_root_title(lang_k)
|
||||
|
||||
lang_pbar.close()
|
||||
print(
|
||||
f"\n翻译完成! Succ: {total_tasks-self.fail_count}, Fail: {self.fail_count}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting translation process...")
|
||||
print("WORKERS: ", CONFIG["max_workers"])
|
||||
print("LANGUAGES: ", CONFIG["target_languages"])
|
||||
print("SOURCE_DIR: ", CONFIG["source_dir"])
|
||||
print("MODEL: ", CONFIG["model"])
|
||||
|
||||
translator = DocumentTranslator()
|
||||
translator.run()
|
Reference in New Issue
Block a user