Files
docutranslate/docutranslate/translator/ai_translator/docx_translator.py
2025-10-19 15:08:34 +08:00

385 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0
import asyncio
from dataclasses import dataclass
from io import BytesIO
from typing import Self, Literal, List, Dict, Any, Tuple
import docx
from docx.document import Document as DocumentObject
from docx.opc.part import Part
from docx.oxml.ns import qn
from docx.oxml.text.run import CT_R
from docx.section import _Header, _Footer
from docx.text.paragraph import Paragraph
from docx.text.run import Run
from docx.table import _Cell, Table
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator
# ---------------- 辅助函数 ----------------
# [v6.2] 定义一组具有显著视觉效果的格式标签。
# 我们只在 Run 包含这些格式时才将其视为空白格式边界。
# 这避免了因字体、字号等微小变化导致的过度文本切分。
SIGNIFICANT_STYLES = frozenset([
qn('w:u'), # 下划线
qn('w:strike'), # 删除线
qn('w:dstrike'), # 双删除线
qn('w:shd'), # 底纹/背景色
qn('w:highlight'), # 荧光笔高亮
qn('w:bdr'), # 边框
qn('w:effectLst'), # 文本效果 (如发光、阴影)
qn('w:em'), # 强调标记 (着重号)
])
def is_image_run(run: Run) -> bool:
"""检查一个 Run 是否包含图片。"""
xml = getattr(run.element, 'xml', '')
return '<w:drawing' in xml or '<w:pict' in xml
def is_formatting_only_run(run: Run) -> bool:
"""
检查一个 Run 是否仅用于格式化,不包含任何应被渲染的文本。
这仅适用于其 .text 属性为 "" 的情况。
"""
return run.text == ""
# ---------- 新增修改部分 1: is_styled_whitespace_run 函数被移除 ----------
# 此函数不再需要,因为新的逻辑会根据格式变化来切分,而不是根据带格式的空格。
# ---------------------- 修改结束 ----------------------
def is_tab_run(run: Run) -> bool:
"""
检查一个 Run 是否主要代表一个制表符,应被视作格式边界。
仅当 Run 的文本内容为空或仅包含空白,且 XML 中存在 <w:tab/> 时,
才将其视为纯格式化用途的 Run。
"""
# .text 属性会将 <w:tab/> 转换成 '\t'
# 如果 .text 在去除空白后仍有内容,说明这个 Run 不仅仅是个制表符。
if run.text.strip():
return False
xml = getattr(run.element, 'xml', '')
return '<w:tab' in xml or '<w:ptab' in xml
# ---------------- 配置类 ----------------
@dataclass
class DocxTranslatorConfig(AiTranslatorConfig):
insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n"
# ---------------- 主类 ----------------
class DocxTranslator(AiTranslator):
"""
一个基于高级结构化解析的 .docx 文件翻译器。
它能高精度保留样式,并正确处理正文、表格、页眉/脚、脚注/尾注、超链接和目录(TOC)等复杂元素。
[v6.4 - 形状文本翻译]
- 新增对形状Shapes内文本的提取与翻译支持。现在可以正确翻译标注、流程图等图形元素中的文字。
- 改进了核心遍历逻辑,使其能够深入 <w:drawing> 元素,解析内部的 <w:txbxContent>(文本框内容),
同时保持对无文本的普通图片的原有处理方式,实现鲁棒性和兼容性。
[v6.3 - 上下文感知格式切分]
- 根本性地改进了文本切分逻辑,解决了因带格式的空格(如下划线空格)导致连续短语被错误拆分的问题。
- 新逻辑不再将任何带格式的空格视为固定的边界,而是通过比较相邻文本片段的“显著格式”是否一致来决定是否合并。
- 只要格式(如下划线、高亮等)保持不变,即使中间有空格,文本也会被视为一个连续的翻译单元,
极大地提升了对标题、合同条款等格式化文本的翻译准确性和流畅性。
[v6.2 - 精确格式保留]
- 根据用户反馈,精确定义了构成“重要格式”的样式范围。
- 现在仅在空白 Run 包含下划线、背景色、边框等显著视觉样式时才将其视为边界。
- 忽略了加粗、倾斜、字体/字号变化等对空格本身无视觉效果或不重要的样式,
避免了因此对翻译句子造成不必要的切分,提升了翻译质量。
[v6.1 - 格式保留修复版]
- 修复了因合并 Run 导致下划线等格式在翻译后丢失的问题。
- 通过引入 is_styled_whitespace_run 检查,将仅包含空格但带有样式的 Run如下划线空格
视为与图片类似的不可翻译边界。
[v6.0 - 语义切分重构版]
- 重构核心逻辑,不再跳过域结果,而是将其作为语义边界来切分文本,增强了鲁棒性。
"""
IGNORED_TAGS = {
qn('w:proofErr'), qn('w:lastRenderedPageBreak'), qn('w:bookmarkStart'),
qn('w:bookmarkEnd'), qn('w:commentRangeStart'), qn('w:commentRangeEnd'),
qn('w:del'), qn('w:ins'), qn('w:moveFrom'), qn('w:moveTo'),
}
RECURSIVE_CONTAINER_TAGS = {
qn('w:smartTag'), qn('w:sdtContent'), qn('w:hyperlink'),
}
def __init__(self, config: DocxTranslatorConfig):
super().__init__(config=config)
self.chunk_size = config.chunk_size
self.translate_agent = None
if not self.skip_translate:
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt, to_lang=config.to_lang, base_url=config.base_url,
api_key=config.api_key, model_id=config.model_id, temperature=config.temperature,
thinking=config.thinking, concurrent=config.concurrent, timeout=config.timeout,
logger=self.logger, glossary_dict=config.glossary_dict, retry=config.retry,
system_proxy_enable=config.system_proxy_enable
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode
self.separator = config.separator
# ---------- 新增修改部分 2: 增加用于比较格式的辅助函数 ----------
def _get_significant_styles(self, run: Run) -> frozenset:
"""从一个 Run 中提取“显著”格式标签的集合。"""
if run is None:
return frozenset()
rPr = run.element.rPr
if rPr is None:
return frozenset()
return frozenset(child.tag for child in rPr if child.tag in SIGNIFICANT_STYLES)
def _have_same_significant_styles(self, run1: Run, run2: Run) -> bool:
"""检查两个 Run 是否具有相同的“显著”格式集合。"""
styles1 = self._get_significant_styles(run1)
styles2 = self._get_significant_styles(run2)
return styles1 == styles2
# ---------------------- 修改结束 ----------------------
# ---------- 代码修改部分 1: 形状翻译逻辑的核心实现 ----------
def _process_element_children(self, element, parent_paragraph: Paragraph, elements: List[Dict[str, Any]],
texts: List[str],
state: Dict[str, Any]):
def flush_segment():
current_runs = state['current_runs']
if not current_runs:
return
full_text = "".join(r.text for r in current_runs)
if full_text.strip():
elements.append({"type": "text_runs", "runs": list(current_runs)})
texts.append(full_text)
state['current_runs'].clear()
for child in element:
if child.tag in self.IGNORED_TAGS:
continue
if child.tag in self.RECURSIVE_CONTAINER_TAGS:
flush_segment()
self._process_element_children(child, parent_paragraph, elements, texts, state)
flush_segment() # 在递归容器后也刷新,确保其内容成为独立片段
continue
field_char_element = child.find(qn('w:fldChar')) if isinstance(child, CT_R) else None
if field_char_element is not None:
fld_type = field_char_element.get(qn('w:fldCharType'))
if fld_type == 'begin' or fld_type == 'end':
flush_segment()
continue
if isinstance(child, CT_R):
# 传入 parent_paragraph 以确保 Run 对象具有正确的上下文
run = Run(child, parent_paragraph)
# 新增逻辑处理形状drawing/pict内的文本
# 形状可以包含文本框,需要优先于图片处理逻辑进行解析
if '<w:drawing' in run.element.xml or '<w:pict' in run.element.xml:
# 使用 list() 消耗迭代器,以便检查是否找到了文本框
text_boxes = list(run.element.iter(qn('w:txbxContent')))
if text_boxes:
flush_segment() # 包含文本的形状是一个边界,刷新前面的文本
for txbx_content in text_boxes:
# 遍历文本框内的所有段落
for p_element in txbx_content.findall(qn('w:p')):
# 创建新的段落对象,并传入父级上下文
shape_para = Paragraph(p_element, parent_paragraph)
# 递归处理该段落
self._process_paragraph(shape_para, elements, texts)
# 如果处理了形状内的文本,则该 Run 的任务已完成
continue
# 保留原有逻辑: 检查绝对边界(图片、制表符等)
if is_image_run(run) or is_formatting_only_run(run) or is_tab_run(run):
flush_segment()
continue # 这些 Run 本身不包含在任何文本片段中
# 保留原有逻辑: 基于格式变化进行切分
last_run_in_segment = state['current_runs'][-1] if state['current_runs'] else None
if last_run_in_segment and not self._have_same_significant_styles(last_run_in_segment, run):
flush_segment()
# 将当前 Run 添加到片段中
state['current_runs'].append(run)
else:
# 遇到任何非 Run 的块级元素(如在单元格中嵌套的表格),都应结束当前文本片段。
flush_segment()
def _process_paragraph(self, para: Paragraph, elements: List[Dict[str, Any]], texts: List[str]):
state = {
'current_runs': [],
}
# 修改调用:传入 `para` 对象作为父级上下文
self._process_element_children(para._p, para, elements, texts, state)
# 确保在段落处理结束时,刷新所有剩余的 Run
current_runs = state['current_runs']
if current_runs:
full_text = "".join(r.text for r in current_runs)
if full_text.strip():
elements.append({"type": "text_runs", "runs": list(current_runs)})
texts.append(full_text)
current_runs.clear()
# ---------------------- 修改结束 ----------------------
def _process_body_elements(self, parent_element, container, elements: List[Dict[str, Any]], texts: List[str]):
""" 遍历一个容器内的所有顶级元素(段落、表格、内容控件等) """
for child_element in parent_element:
if child_element.tag.endswith('p'):
self._process_paragraph(Paragraph(child_element, container), elements, texts)
elif child_element.tag.endswith('tbl'):
table = Table(child_element, container)
for row in table.rows:
for cell in row.cells:
self._traverse_container(cell, elements, texts)
elif child_element.tag.endswith('sdt'):
sdt_content = child_element.find(qn('w:sdtContent'))
if sdt_content is not None:
self._process_body_elements(sdt_content, container, elements, texts)
def _traverse_container(self, container: Any, elements: List[Dict[str, Any]], texts: List[str]):
"""
[核心导航员 v2.0 - 增强版] 健壮地遍历任何文本容器,
特别为具有额外嵌套层的 Part (如脚注/尾注) 提供了专门处理逻辑。
"""
if container is None:
return
parent_element = None
if isinstance(container, (DocumentObject, Part)):
parent_element = container.element.body if hasattr(container.element, 'body') else container.element
elif isinstance(container, (_Cell, _Header, _Footer)):
parent_element = container._element
else:
self.logger.warning(f"跳过未知类型的容器: {type(container)}")
return
if parent_element is not None and parent_element.tag in [qn('w:footnotes'), qn('w:endnotes')]:
for note_element in parent_element:
self._process_body_elements(note_element, container, elements, texts)
elif parent_element is not None:
self._process_body_elements(parent_element, container, elements, texts)
def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]:
doc = docx.Document(BytesIO(document.content))
elements, texts = [], []
self._traverse_container(doc, elements, texts)
for section in doc.sections:
self._traverse_container(section.header, elements, texts)
self._traverse_container(section.first_page_header, elements, texts)
self._traverse_container(section.even_page_header, elements, texts)
self._traverse_container(section.footer, elements, texts)
self._traverse_container(section.first_page_footer, elements, texts)
self._traverse_container(section.even_page_footer, elements, texts)
if hasattr(doc.part, 'footnotes_part') and doc.part.footnotes_part is not None:
self._traverse_container(doc.part.footnotes_part, elements, texts)
if hasattr(doc.part, 'endnotes_part') and doc.part.endnotes_part is not None:
self._traverse_container(doc.part.endnotes_part, elements, texts)
return doc, elements, texts
def _apply_translation(self, element_info: Dict[str, Any], final_text: str):
if element_info["type"] == "text_runs":
runs = element_info["runs"]
if not runs: return
first_real_run_index = -1
# 找到第一个可以写入文本的run
for i, run in enumerate(runs):
if run.element.getparent() is not None:
run.text = final_text
first_real_run_index = i
break
# 如果没有找到有效的run例如它们都已被删除则记录警告
if first_real_run_index == -1:
self.logger.warning(f"无法应用翻译 '{final_text}'因为找不到有效的run。")
return
# 删除所有后续的run因为它们的文本已经被合并到第一个run中了
for i in range(first_real_run_index + 1, len(runs)):
run = runs[i]
parent_element = run.element.getparent()
if parent_element is not None:
try:
parent_element.remove(run.element)
except ValueError:
# 在某些复杂情况下一个run可能已经被其父元素隐式删除
self.logger.debug(f"尝试删除一个不存在的run元素。这通常是安全的。")
pass
def _after_translate(self, doc: DocumentObject, elements: List[Dict[str, Any]], translated: List[str],
originals: List[str]) -> bytes:
if len(elements) != len(translated):
self.logger.error(
f"翻译数量不匹配!原文: {len(originals)}, 译文: {len(translated)}. 将只处理公共部分。")
min_len = min(len(elements), len(translated), len(originals))
elements, translated, originals = elements[:min_len], translated[:min_len], originals[:min_len]
for info, orig, trans in zip(elements, originals, translated):
if self.insert_mode == "replace":
final_text = trans
elif self.insert_mode == "append":
final_text = orig + self.separator + trans
elif self.insert_mode == "prepend":
final_text = trans + self.separator + orig
else:
final_text = trans
self._apply_translation(info, final_text)
doc_output_stream = BytesIO()
doc.save(doc_output_stream)
return doc_output_stream.getvalue()
def translate(self, document: Document) -> Self:
doc, elements, originals = self._pre_translate(document)
if not originals:
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = self._after_translate(doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = self.translate_agent.send_segments(originals,
self.chunk_size) if self.translate_agent else originals
document.content = self._after_translate(doc, elements, translated, originals)
return self
async def translate_async(self, document: Document) -> Self:
doc, elements, originals = await asyncio.to_thread(self._pre_translate, document)
if not originals:
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = await asyncio.to_thread(self._after_translate, doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = await self.translate_agent.send_segments_async(originals,
self.chunk_size) if self.translate_agent else originals
document.content = await asyncio.to_thread(self._after_translate, doc, elements, translated, originals)
return self