优化docx翻译

This commit is contained in:
xunbu
2025-10-13 10:42:20 +08:00
parent 5178970daf
commit aed88a91d6

View File

@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2025 QinHan # SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0 # SPDX-License-Identifier: MPL-2.0
import asyncio import asyncio
import re
from dataclasses import dataclass from dataclasses import dataclass
from io import BytesIO from io import BytesIO
from typing import Self, Literal, List, Dict, Any, Tuple from typing import Self, Literal, List, Dict, Any, Tuple
@@ -18,35 +19,34 @@ from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator
HEADING_STYLES = {f"Heading {i}" for i in range(1, 10)} | \ # ---------------- 辅助函数 ----------------
{f"heading {i}" for i in range(1, 10)} | \
{f"标题 {i}" for i in range(1, 10)}
def is_image_run(run: Run) -> bool: def is_image_run(run: Run) -> bool:
"""检查一个 run 是否包含图片。""" """检查一个 Run 是否包含图片。"""
return '<w:drawing' in run.element.xml or '<w:pict' in run.element.xml xml = getattr(run.element, 'xml', '')
return '<w:drawing' in xml or '<w:pict' in xml
def is_formatting_only_run(run: Run) -> bool: def is_formatting_only_run(run: Run) -> bool:
"""检查一个 run 是否只包含格式(如下划线)而没有实际的、非空白的文本内容""" """检查一个 Run 是否仅用于格式化(例如,一个空的粗体 Run"""
if run.text.strip() == "": text = getattr(run, 'text', None)
if run.underline: if text is None or text.strip() == "":
if run.underline or run.bold or run.italic:
return True return True
return False return False
# ---------------- 配置类 ----------------
@dataclass @dataclass
class DocxTranslatorConfig(AiTranslatorConfig): class DocxTranslatorConfig(AiTranslatorConfig):
"""DocxTranslator 的配置类。"""
insert_mode: Literal["replace", "append", "prepend"] = "replace" insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n" separator: str = "\n"
# ---------------- 主类 ----------------
class DocxTranslator(AiTranslator): class DocxTranslator(AiTranslator):
""" """
用于翻译 .docx 文件的翻译器 用于翻译 .docx 文件的高级翻译器,能够高精度保留样式、处理超链接、
[核心优化] 仅在检测到目录且相关标题被翻译时,才设置“更新域”标志 域代码(如图注),并支持翻译脚注、尾注等
""" """
def __init__(self, config: DocxTranslatorConfig): def __init__(self, config: DocxTranslatorConfig):
@@ -70,222 +70,211 @@ class DocxTranslator(AiTranslator):
system_proxy_enable=config.system_proxy_enable system_proxy_enable=config.system_proxy_enable
) )
self.translate_agent = SegmentsTranslateAgent(agent_config) self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode self.insert_mode = config.insert_mode
self.separator = config.separator self.separator = config.separator
# [新增] 状态变量,用于智能判断 @staticmethod
self._has_toc_field = False def _extract_hyperlink_text(hyperlink_element) -> str:
self._translated_a_heading = False """从超链接 OXML 元素中提取所有显示文本。"""
return ''.join(
t.text for t in hyperlink_element.findall('.//w:t', namespaces=hyperlink_element.nsmap) if t.text
)
def _check_for_toc(self, doc: DocumentObject) -> bool: def _is_seq_field(self, child) -> bool:
"""[新增] 扫描文档检查是否存在目录TOC""" """判断一个 OXML 元素是否为 SEQ 域的一部分(如图、表编号)"""
# 目录的指令文本通常包含 'TOC' try:
# 我们需要查找 <w:instrText> 元素 if child.tag == qn('w:fldSimple'):
for instr_text in doc.element.body.iter(qn('w:instrText')): instr = child.get(qn('w:instr'), '')
if instr_text.text and 'TOC' in instr_text.text.strip(): if 'SEQ' in instr: return True
self.logger.info("在文档中检测到目录TOC") if child.tag == qn('w:r'):
for instr_text in child.findall('.//w:instrText', namespaces=child.nsmap):
if instr_text.text and 'SEQ' in instr_text.text:
return True return True
except Exception:
pass
return False return False
def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]: def _process_paragraph(self, para: Paragraph, elements: List[Dict[str, Any]], texts: List[str]):
""" """
预处理 .docx 文件,提取文本并检测是否需要更新域 使用状态机处理段落,精确切分可翻译的文本片段
""" """
doc = docx.Document(BytesIO(document.content)) if not para.text.strip():
elements_to_translate = [] return
original_texts = []
# [新增] 在开始时重置状态并进行检测 current_text = ""
self._has_toc_field = self._check_for_toc(doc)
self._translated_a_heading = False
def get_hyperlink_text(hyperlink_element) -> str:
text = ""
for t_element in hyperlink_element.findall('.//w:t', namespaces=hyperlink_element.nsmap):
if t_element.text:
text += t_element.text
return text
def process_paragraph_children(para: Paragraph):
nonlocal elements_to_translate, original_texts
current_text_segment = ""
current_runs = [] current_runs = []
is_inside_field = False
# [新增] 检查当前段落是否为标题样式 def flush_segment():
is_heading_para = para.style.name in HEADING_STYLES nonlocal current_text, current_runs
if current_text.strip():
elements.append({"type": "text_runs", "runs": current_runs})
texts.append(current_text)
current_text, current_runs = "", []
for child in para._p: for child in para._p:
if self._is_seq_field(child):
flush_segment()
continue
if child.tag == qn('w:fldChar'):
fld_type = child.get(qn('w:fldCharType'))
if fld_type == 'begin':
flush_segment()
is_inside_field = True
elif fld_type == 'end':
is_inside_field = False
continue
if is_inside_field:
continue
if isinstance(child, CT_R): if isinstance(child, CT_R):
run = Run(child, para) run = Run(child, para)
if is_image_run(run) or is_formatting_only_run(run): if is_image_run(run) or is_formatting_only_run(run) or run.element.find(qn('w:tab')) is not None:
if current_text_segment.strip(): flush_segment()
elements_to_translate.append({"type": "text_runs", "runs": current_runs})
original_texts.append(current_text_segment)
# [新增] 如果这个文本块来自标题段落,则标记
if is_heading_para:
self._translated_a_heading = True
current_text_segment = ""
current_runs = []
else: else:
current_runs.append(run) current_runs.append(run)
current_text_segment += run.text current_text += run.text or ""
elif child.tag == qn('w:hyperlink'): elif child.tag == qn('w:hyperlink'):
# (省略超链接处理逻辑,与之前版本相同) flush_segment()
# ... hyperlink_text = self._extract_hyperlink_text(child)
if current_text_segment.strip():
elements_to_translate.append({"type": "text_runs", "runs": current_runs})
original_texts.append(current_text_segment)
if is_heading_para:
self._translated_a_heading = True
current_text_segment = ""
current_runs = []
hyperlink_text = get_hyperlink_text(child)
if hyperlink_text.strip(): if hyperlink_text.strip():
style_run = None style_run = None
r_elements = child.findall(qn('w:r')) if r_elements := child.findall(f'.//{qn("w:r")}'):
if r_elements:
style_run = Run(r_elements[0], para) style_run = Run(r_elements[0], para)
elements_to_translate.append({ elements.append({"type": "hyperlink", "element": child, "style_run": style_run})
"type": "hyperlink", texts.append(hyperlink_text)
"element": child, else:
"style_run": style_run flush_segment()
})
original_texts.append(hyperlink_text)
if is_heading_para:
self._translated_a_heading = True
if current_text_segment.strip(): flush_segment()
elements_to_translate.append({"type": "text_runs", "runs": current_runs})
original_texts.append(current_text_segment)
# [新增] 如果这个文本块来自标题段落,则标记
if is_heading_para:
self._translated_a_heading = True
def process_container(container): def _process_container(self, container, elements: List[Dict[str, Any]], texts: List[str]):
if not container: """递归处理包含段落和表格的容器(如文档、单元格、页眉)。"""
return if not container: return
for para in container.paragraphs: for para in getattr(container, 'paragraphs', []):
process_paragraph_children(para) self._process_paragraph(para, elements, texts)
for table in container.tables: for table in getattr(container, 'tables', []):
for row in table.rows: for row in table.rows:
for cell in row.cells: for cell in row.cells:
process_container(cell) self._process_container(cell, elements, texts)
process_container(doc) def _process_part(self, doc_part, elements: List[Dict[str, Any]], texts: List[str]):
"""处理文档的非主内容部分,如脚注、尾注。"""
if not doc_part: return
for para_element in doc_part.element.findall('.//w:p', namespaces=doc_part.element.nsmap):
try:
self._process_paragraph(Paragraph(para_element, doc_part), elements, texts)
except Exception as e:
self.logger.warning(f"处理文档部件段落时出错: {e}")
def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]:
doc = docx.Document(BytesIO(document.content))
elements, texts = [], []
# 1. 处理主文档内容
self._process_container(doc, elements, texts)
# 2. (已修复) 处理所有类型的页眉和页脚
for section in doc.sections: for section in doc.sections:
process_container(section.header) self._process_container(section.header, elements, texts)
process_container(section.first_page_header) self._process_container(section.first_page_header, elements, texts)
process_container(section.even_page_header) self._process_container(section.even_page_header, elements, texts)
process_container(section.footer) self._process_container(section.footer, elements, texts)
process_container(section.first_page_footer) self._process_container(section.first_page_footer, elements, texts)
process_container(section.even_page_footer) self._process_container(section.even_page_footer, elements, texts)
return doc, elements_to_translate, original_texts # 3. 处理脚注、尾注
if part := getattr(doc.part, 'footnotes_part', None): self._process_part(part, elements, texts)
if part := getattr(doc.part, 'endnotes_part', None): self._process_part(part, elements, texts)
def _enable_update_fields_on_open(self, doc: DocumentObject): # (已移除) 不再处理批注
settings_element = doc.settings.element
update_fields_tag_clark = qn('w:updateFields')
update_fields = settings_element.find(update_fields_tag_clark)
if update_fields is None:
update_fields = OxmlElement('w:updateFields')
settings_element.append(update_fields)
update_fields.set(qn('w:val'), 'true')
def _after_translate(self, doc: DocumentObject, elements_to_translate: List[Dict[str, Any]], return doc, elements, texts
translated_texts: List[str], original_texts: List[str]) -> bytes:
# 回写翻译文本的逻辑保持不变...
for i, element_info in enumerate(elements_to_translate):
# ... (此处省略与前一版本完全相同的回写代码)
original_text = original_texts[i]
translated_text = translated_texts[i]
if self.insert_mode == "replace": def _apply_translation(self, element_info: Dict[str, Any], final_text: str):
final_text = translated_text """将翻译后的文本写回对应的 OXML 元素。"""
elif self.insert_mode == "append": el_type = element_info["type"]
final_text = original_text + self.separator + translated_text if el_type == "text_runs":
elif self.insert_mode == "prepend":
final_text = translated_text + self.separator + original_text
else:
self.logger.error("不正确的DocxTranslatorConfig参数")
final_text = translated_text
element_type = element_info["type"]
if element_type == "text_runs":
runs = element_info["runs"] runs = element_info["runs"]
if not runs: continue if runs:
runs[0].text = final_text runs[0].text = final_text
for run in runs[1:]: run.text = "" for run in runs[1:]:
run.text = ""
elif el_type == "hyperlink":
hyperlink = element_info["element"]
r_elements = hyperlink.findall(f'.//{qn("w:r")}')
if r_elements:
first_r = r_elements[0]
for t in first_r.findall(f'.//{qn("w:t")}'):
first_r.remove(t)
new_t = OxmlElement('w:t')
new_t.text = final_text
new_t.set(qn('xml:space'), 'preserve')
first_r.append(new_t)
for other_r in r_elements[1:]:
if (parent := other_r.getparent()) is not None:
parent.remove(other_r)
elif element_type == "hyperlink": def _after_translate(self, doc: DocumentObject, elements: List[Dict[str, Any]], translated: List[str],
hyperlink_element = element_info["element"] originals: List[str]) -> bytes:
style_run = element_info["style_run"] if len(elements) != len(translated):
for run_element in hyperlink_element.findall(qn('w:r')): self.logger.error(f"翻译数量不匹配! 原文: {len(originals)}, 译文: {len(translated)}. 将只处理公共部分。")
hyperlink_element.remove(run_element) min_len = min(len(elements), len(translated), len(originals))
new_run_element = OxmlElement('w:r') elements, translated, originals = elements[:min_len], translated[:min_len], originals[:min_len]
if style_run and style_run.element.rPr is not None:
new_run_element.append(style_run.element.rPr)
new_text_element = OxmlElement('w:t')
new_text_element.text = final_text
new_text_element.set(qn('xml:space'), 'preserve')
new_run_element.append(new_text_element)
hyperlink_element.append(new_run_element)
# [核心修改] 智能决策:仅在需要时才启用“打开时更新域” for info, orig, trans in zip(elements, originals, translated):
if self._has_toc_field and self._translated_a_heading: if self.insert_mode == "replace":
self.logger.info("检测到目录且相关标题已被翻译,设置文档在打开时更新域。") final_text = trans
self._enable_update_fields_on_open(doc) elif self.insert_mode == "append":
final_text = orig + self.separator + trans
elif self.insert_mode == "prepend":
final_text = trans + self.separator + orig
else: else:
self.logger.info("未翻译标题或文档无目录,跳过设置更新域标志。") final_text = trans
self._apply_translation(info, final_text)
# (已移除) 不再提示更新域
doc_output_stream = BytesIO() doc_output_stream = BytesIO()
doc.save(doc_output_stream) doc.save(doc_output_stream)
return doc_output_stream.getvalue() return doc_output_stream.getvalue()
# translate 和 translate_async 方法保持不变
def translate(self, document: Document) -> Self: def translate(self, document: Document) -> Self:
doc, elements_to_translate, original_texts = self._pre_translate(document) doc, elements, originals = self._pre_translate(document)
if not original_texts: if not originals:
print("\n文件中没有找到需要翻译的文本内容。") self.logger.info("\n在文档中没有找到需要翻译的文本内容。")
output_stream = BytesIO() document.content = self._after_translate(doc, elements, [], [])
doc.save(output_stream)
document.content = output_stream.getvalue()
return self return self
if self.glossary_agent: if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(original_texts, self.chunk_size) self.glossary_dict_gen = self.glossary_agent.send_segments(originals, self.chunk_size)
if self.translate_agent: if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen) self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
if self.translate_agent: translated = self.translate_agent.send_segments(originals,
translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size) self.chunk_size) if self.translate_agent else originals
else:
translated_texts = original_texts
document.content = self._after_translate(doc, elements_to_translate, translated_texts, original_texts) document.content = self._after_translate(doc, elements, translated, originals)
return self return self
async def translate_async(self, document: Document) -> Self: async def translate_async(self, document: Document) -> Self:
doc, elements_to_translate, original_texts = await asyncio.to_thread(self._pre_translate, document) doc, elements, originals = await asyncio.to_thread(self._pre_translate, document)
if not original_texts: if not originals:
print("\n文件中没有找到需要翻译的文本内容。") self.logger.info("\n在文档中没有找到需要翻译的文本内容。")
output_stream = BytesIO() document.content = await asyncio.to_thread(self._after_translate, doc, elements, [], [])
await asyncio.to_thread(doc.save, output_stream)
document.content = output_stream.getvalue()
return self return self
if self.glossary_agent: if self.glossary_agent:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(original_texts, self.chunk_size) self.glossary_dict_gen = await self.glossary_agent.send_segments_async(originals, self.chunk_size)
if self.translate_agent: if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen) self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
if self.translate_agent: translated = await self.translate_agent.send_segments_async(originals,
translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size) self.chunk_size) if self.translate_agent else originals
else:
translated_texts = original_texts document.content = await asyncio.to_thread(self._after_translate, doc, elements, translated, originals)
document.content = await asyncio.to_thread(self._after_translate, doc, elements_to_translate, translated_texts,
original_texts)
return self return self