From aed88a91d6f791e6692fef721f98a8ca5d637045 Mon Sep 17 00:00:00 2001 From: xunbu Date: Mon, 13 Oct 2025 10:42:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96docx=E7=BF=BB=E8=AF=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai_translator/docx_translator.py | 365 +++++++++--------- 1 file changed, 177 insertions(+), 188 deletions(-) diff --git a/docutranslate/translator/ai_translator/docx_translator.py b/docutranslate/translator/ai_translator/docx_translator.py index 056969b..546f4ed 100644 --- a/docutranslate/translator/ai_translator/docx_translator.py +++ b/docutranslate/translator/ai_translator/docx_translator.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2025 QinHan # SPDX-License-Identifier: MPL-2.0 import asyncio +import re from dataclasses import dataclass from io import BytesIO from typing import Self, Literal, List, Dict, Any, Tuple @@ -18,35 +19,34 @@ from docutranslate.ir.document import Document from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator -HEADING_STYLES = {f"Heading {i}" for i in range(1, 10)} | \ - {f"heading {i}" for i in range(1, 10)} | \ - {f"标题 {i}" for i in range(1, 10)} - - +# ---------------- 辅助函数 ---------------- def is_image_run(run: Run) -> bool: - """检查一个 run 是否包含图片。""" - return ' bool: - """检查一个 run 是否只包含格式(如下划线)而没有实际的、非空白的文本内容。""" - if run.text.strip() == "": - if run.underline: + """检查一个 Run 是否仅用于格式化(例如,一个空的粗体 Run)。""" + text = getattr(run, 'text', None) + if text is None or text.strip() == "": + if run.underline or run.bold or run.italic: return True return False +# ---------------- 配置类 ---------------- @dataclass class DocxTranslatorConfig(AiTranslatorConfig): - """DocxTranslator 的配置类。""" insert_mode: Literal["replace", "append", "prepend"] = "replace" separator: str = "\n" +# ---------------- 主类 ---------------- class DocxTranslator(AiTranslator): """ - 用于翻译 .docx 文件的翻译器。 - [核心优化] 仅在检测到目录且相关标题被翻译时,才设置“更新域”标志。 + 用于翻译 .docx 文件的高级翻译器,能够高精度保留样式、处理超链接、 + 域代码(如图注),并支持翻译脚注、尾注等。 """ def __init__(self, config: DocxTranslatorConfig): @@ -70,222 +70,211 @@ class DocxTranslator(AiTranslator): system_proxy_enable=config.system_proxy_enable ) self.translate_agent = SegmentsTranslateAgent(agent_config) + self.insert_mode = config.insert_mode self.separator = config.separator - # [新增] 状态变量,用于智能判断 - self._has_toc_field = False - self._translated_a_heading = False + @staticmethod + def _extract_hyperlink_text(hyperlink_element) -> str: + """从超链接 OXML 元素中提取所有显示文本。""" + return ''.join( + t.text for t in hyperlink_element.findall('.//w:t', namespaces=hyperlink_element.nsmap) if t.text + ) - def _check_for_toc(self, doc: DocumentObject) -> bool: - """[新增] 扫描文档,检查是否存在目录(TOC)域。""" - # 目录的指令文本通常包含 'TOC' - # 我们需要查找 元素 - for instr_text in doc.element.body.iter(qn('w:instrText')): - if instr_text.text and 'TOC' in instr_text.text.strip(): - self.logger.info("在文档中检测到目录(TOC)。") - return True + def _is_seq_field(self, child) -> bool: + """判断一个 OXML 元素是否为 SEQ 域的一部分(如图、表编号)。""" + try: + if child.tag == qn('w:fldSimple'): + instr = child.get(qn('w:instr'), '') + if 'SEQ' in instr: return True + if child.tag == qn('w:r'): + for instr_text in child.findall('.//w:instrText', namespaces=child.nsmap): + if instr_text.text and 'SEQ' in instr_text.text: + return True + except Exception: + pass return False - def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]: + def _process_paragraph(self, para: Paragraph, elements: List[Dict[str, Any]], texts: List[str]): """ - 预处理 .docx 文件,提取文本并检测是否需要更新域。 + 使用状态机处理段落,精确切分可翻译的文本片段。 """ - doc = docx.Document(BytesIO(document.content)) - elements_to_translate = [] - original_texts = [] + if not para.text.strip(): + return - # [新增] 在开始时重置状态并进行检测 - self._has_toc_field = self._check_for_toc(doc) - self._translated_a_heading = False + current_text = "" + current_runs = [] + is_inside_field = False - def get_hyperlink_text(hyperlink_element) -> str: - text = "" - for t_element in hyperlink_element.findall('.//w:t', namespaces=hyperlink_element.nsmap): - if t_element.text: - text += t_element.text - return text + def flush_segment(): + nonlocal current_text, current_runs + if current_text.strip(): + elements.append({"type": "text_runs", "runs": current_runs}) + texts.append(current_text) + current_text, current_runs = "", [] - def process_paragraph_children(para: Paragraph): - nonlocal elements_to_translate, original_texts - current_text_segment = "" - current_runs = [] + for child in para._p: + if self._is_seq_field(child): + flush_segment() + continue - # [新增] 检查当前段落是否为标题样式 - is_heading_para = para.style.name in HEADING_STYLES + if child.tag == qn('w:fldChar'): + fld_type = child.get(qn('w:fldCharType')) + if fld_type == 'begin': + flush_segment() + is_inside_field = True + elif fld_type == 'end': + is_inside_field = False + continue - for child in para._p: - if isinstance(child, CT_R): - run = Run(child, para) - if is_image_run(run) or is_formatting_only_run(run): - if current_text_segment.strip(): - elements_to_translate.append({"type": "text_runs", "runs": current_runs}) - original_texts.append(current_text_segment) - # [新增] 如果这个文本块来自标题段落,则标记 - if is_heading_para: - self._translated_a_heading = True - current_text_segment = "" - current_runs = [] - else: - current_runs.append(run) - current_text_segment += run.text + if is_inside_field: + continue - elif child.tag == qn('w:hyperlink'): - # (省略超链接处理逻辑,与之前版本相同) - # ... - if current_text_segment.strip(): - elements_to_translate.append({"type": "text_runs", "runs": current_runs}) - original_texts.append(current_text_segment) - if is_heading_para: - self._translated_a_heading = True - current_text_segment = "" - current_runs = [] + if isinstance(child, CT_R): + run = Run(child, para) + if is_image_run(run) or is_formatting_only_run(run) or run.element.find(qn('w:tab')) is not None: + flush_segment() + else: + current_runs.append(run) + current_text += run.text or "" + elif child.tag == qn('w:hyperlink'): + flush_segment() + hyperlink_text = self._extract_hyperlink_text(child) + if hyperlink_text.strip(): + style_run = None + if r_elements := child.findall(f'.//{qn("w:r")}'): + style_run = Run(r_elements[0], para) - hyperlink_text = get_hyperlink_text(child) - if hyperlink_text.strip(): - style_run = None - r_elements = child.findall(qn('w:r')) - if r_elements: - style_run = Run(r_elements[0], para) - - elements_to_translate.append({ - "type": "hyperlink", - "element": child, - "style_run": style_run - }) - original_texts.append(hyperlink_text) - if is_heading_para: - self._translated_a_heading = True - - if current_text_segment.strip(): - elements_to_translate.append({"type": "text_runs", "runs": current_runs}) - original_texts.append(current_text_segment) - # [新增] 如果这个文本块来自标题段落,则标记 - if is_heading_para: - self._translated_a_heading = True - - def process_container(container): - if not container: - return - for para in container.paragraphs: - process_paragraph_children(para) - for table in container.tables: - for row in table.rows: - for cell in row.cells: - process_container(cell) - - process_container(doc) - for section in doc.sections: - process_container(section.header) - process_container(section.first_page_header) - process_container(section.even_page_header) - process_container(section.footer) - process_container(section.first_page_footer) - process_container(section.even_page_footer) - - return doc, elements_to_translate, original_texts - - def _enable_update_fields_on_open(self, doc: DocumentObject): - settings_element = doc.settings.element - update_fields_tag_clark = qn('w:updateFields') - update_fields = settings_element.find(update_fields_tag_clark) - if update_fields is None: - update_fields = OxmlElement('w:updateFields') - settings_element.append(update_fields) - update_fields.set(qn('w:val'), 'true') - - def _after_translate(self, doc: DocumentObject, elements_to_translate: List[Dict[str, Any]], - translated_texts: List[str], original_texts: List[str]) -> bytes: - # 回写翻译文本的逻辑保持不变... - for i, element_info in enumerate(elements_to_translate): - # ... (此处省略与前一版本完全相同的回写代码) - original_text = original_texts[i] - translated_text = translated_texts[i] - - if self.insert_mode == "replace": - final_text = translated_text - elif self.insert_mode == "append": - final_text = original_text + self.separator + translated_text - elif self.insert_mode == "prepend": - final_text = translated_text + self.separator + original_text + elements.append({"type": "hyperlink", "element": child, "style_run": style_run}) + texts.append(hyperlink_text) else: - self.logger.error("不正确的DocxTranslatorConfig参数") - final_text = translated_text + flush_segment() - element_type = element_info["type"] + flush_segment() - if element_type == "text_runs": - runs = element_info["runs"] - if not runs: continue + def _process_container(self, container, elements: List[Dict[str, Any]], texts: List[str]): + """递归处理包含段落和表格的容器(如文档、单元格、页眉)。""" + if not container: return + for para in getattr(container, 'paragraphs', []): + self._process_paragraph(para, elements, texts) + for table in getattr(container, 'tables', []): + for row in table.rows: + for cell in row.cells: + self._process_container(cell, elements, texts) + + def _process_part(self, doc_part, elements: List[Dict[str, Any]], texts: List[str]): + """处理文档的非主内容部分,如脚注、尾注。""" + if not doc_part: return + for para_element in doc_part.element.findall('.//w:p', namespaces=doc_part.element.nsmap): + try: + self._process_paragraph(Paragraph(para_element, doc_part), elements, texts) + except Exception as e: + self.logger.warning(f"处理文档部件段落时出错: {e}") + + def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]: + doc = docx.Document(BytesIO(document.content)) + elements, texts = [], [] + + # 1. 处理主文档内容 + self._process_container(doc, elements, texts) + + # 2. (已修复) 处理所有类型的页眉和页脚 + for section in doc.sections: + self._process_container(section.header, elements, texts) + self._process_container(section.first_page_header, elements, texts) + self._process_container(section.even_page_header, elements, texts) + self._process_container(section.footer, elements, texts) + self._process_container(section.first_page_footer, elements, texts) + self._process_container(section.even_page_footer, elements, texts) + + # 3. 处理脚注、尾注 + if part := getattr(doc.part, 'footnotes_part', None): self._process_part(part, elements, texts) + if part := getattr(doc.part, 'endnotes_part', None): self._process_part(part, elements, texts) + + # (已移除) 不再处理批注 + + return doc, elements, texts + + def _apply_translation(self, element_info: Dict[str, Any], final_text: str): + """将翻译后的文本写回对应的 OXML 元素。""" + el_type = element_info["type"] + if el_type == "text_runs": + runs = element_info["runs"] + if runs: runs[0].text = final_text - for run in runs[1:]: run.text = "" + for run in runs[1:]: + run.text = "" + elif el_type == "hyperlink": + hyperlink = element_info["element"] + r_elements = hyperlink.findall(f'.//{qn("w:r")}') + if r_elements: + first_r = r_elements[0] + for t in first_r.findall(f'.//{qn("w:t")}'): + first_r.remove(t) + new_t = OxmlElement('w:t') + new_t.text = final_text + new_t.set(qn('xml:space'), 'preserve') + first_r.append(new_t) + for other_r in r_elements[1:]: + if (parent := other_r.getparent()) is not None: + parent.remove(other_r) - elif element_type == "hyperlink": - hyperlink_element = element_info["element"] - style_run = element_info["style_run"] - for run_element in hyperlink_element.findall(qn('w:r')): - hyperlink_element.remove(run_element) - new_run_element = OxmlElement('w:r') - if style_run and style_run.element.rPr is not None: - new_run_element.append(style_run.element.rPr) - new_text_element = OxmlElement('w:t') - new_text_element.text = final_text - new_text_element.set(qn('xml:space'), 'preserve') - new_run_element.append(new_text_element) - hyperlink_element.append(new_run_element) + def _after_translate(self, doc: DocumentObject, elements: List[Dict[str, Any]], translated: List[str], + originals: List[str]) -> bytes: + if len(elements) != len(translated): + self.logger.error(f"翻译数量不匹配! 原文: {len(originals)}, 译文: {len(translated)}. 将只处理公共部分。") + min_len = min(len(elements), len(translated), len(originals)) + elements, translated, originals = elements[:min_len], translated[:min_len], originals[:min_len] - # [核心修改] 智能决策:仅在需要时才启用“打开时更新域” - if self._has_toc_field and self._translated_a_heading: - self.logger.info("检测到目录且相关标题已被翻译,设置文档在打开时更新域。") - self._enable_update_fields_on_open(doc) - else: - self.logger.info("未翻译标题或文档无目录,跳过设置更新域标志。") + for info, orig, trans in zip(elements, originals, translated): + if self.insert_mode == "replace": + final_text = trans + elif self.insert_mode == "append": + final_text = orig + self.separator + trans + elif self.insert_mode == "prepend": + final_text = trans + self.separator + orig + else: + final_text = trans + self._apply_translation(info, final_text) + + # (已移除) 不再提示更新域 doc_output_stream = BytesIO() doc.save(doc_output_stream) return doc_output_stream.getvalue() - # translate 和 translate_async 方法保持不变 def translate(self, document: Document) -> Self: - doc, elements_to_translate, original_texts = self._pre_translate(document) - if not original_texts: - print("\n文件中没有找到需要翻译的文本内容。") - output_stream = BytesIO() - doc.save(output_stream) - document.content = output_stream.getvalue() + doc, elements, originals = self._pre_translate(document) + if not originals: + self.logger.info("\n在文档中没有找到需要翻译的文本内容。") + document.content = self._after_translate(doc, elements, [], []) return self if self.glossary_agent: - self.glossary_dict_gen = self.glossary_agent.send_segments(original_texts, self.chunk_size) + self.glossary_dict_gen = self.glossary_agent.send_segments(originals, self.chunk_size) if self.translate_agent: self.translate_agent.update_glossary_dict(self.glossary_dict_gen) - if self.translate_agent: - translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size) - else: - translated_texts = original_texts + translated = self.translate_agent.send_segments(originals, + self.chunk_size) if self.translate_agent else originals - document.content = self._after_translate(doc, elements_to_translate, translated_texts, original_texts) + document.content = self._after_translate(doc, elements, translated, originals) return self async def translate_async(self, document: Document) -> Self: - doc, elements_to_translate, original_texts = await asyncio.to_thread(self._pre_translate, document) - if not original_texts: - print("\n文件中没有找到需要翻译的文本内容。") - output_stream = BytesIO() - await asyncio.to_thread(doc.save, output_stream) - document.content = output_stream.getvalue() + doc, elements, originals = await asyncio.to_thread(self._pre_translate, document) + if not originals: + self.logger.info("\n在文档中没有找到需要翻译的文本内容。") + document.content = await asyncio.to_thread(self._after_translate, doc, elements, [], []) return self if self.glossary_agent: - self.glossary_dict_gen = await self.glossary_agent.send_segments_async(original_texts, self.chunk_size) + self.glossary_dict_gen = await self.glossary_agent.send_segments_async(originals, self.chunk_size) if self.translate_agent: self.translate_agent.update_glossary_dict(self.glossary_dict_gen) - if self.translate_agent: - translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size) - else: - translated_texts = original_texts - document.content = await asyncio.to_thread(self._after_translate, doc, elements_to_translate, translated_texts, - original_texts) + translated = await self.translate_agent.send_segments_async(originals, + self.chunk_size) if self.translate_agent else originals + + document.content = await asyncio.to_thread(self._after_translate, doc, elements, translated, originals) return self \ No newline at end of file