优化docx翻译

This commit is contained in:
xunbu
2025-10-14 14:52:22 +08:00
parent 99ece79b6c
commit 244cdd8fc7

View File

@@ -8,10 +8,10 @@ from typing import Self, Literal, List, Dict, Any, Tuple
import docx
from docx.document import Document as DocumentObject
from docx.oxml.ns import qn
from docx.oxml.shared import OxmlElement
from docx.oxml.text.run import CT_R
from docx.text.paragraph import Paragraph
from docx.text.run import Run
from docx.table import _Cell, Table
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
@@ -27,17 +27,12 @@ def is_image_run(run: Run) -> bool:
def is_formatting_only_run(run: Run) -> bool:
"""
检查一个 Run 是否主要用于格式化,例如
- 一个空的粗体/斜体/下划线 Run。
- 一个只包含空格但有下划线的 Run (用于画线)。
检查一个 Run 是否主要用于格式化,例如一个空的粗体/斜体/下划线 Run。
"""
text = run.text
# 如果文本为空或只包含空格
if not text.strip():
# 并且它带有任何一种常见的格式,就认为它是一个格式化标记
if run.underline or run.bold or run.italic or run.font.strike or run.font.subscript or run.font.superscript:
return True
# 特别处理:如果文本是空格且有下划线,这几乎总是为了画线
if text and run.underline:
return True
return False
@@ -53,29 +48,20 @@ class DocxTranslatorConfig(AiTranslatorConfig):
# ---------------- 主类 ----------------
class DocxTranslator(AiTranslator):
"""
用于翻译 .docx 文件的高级翻译器,能够高精度保留样式、处理超链接、
域代码(如图注),并支持翻译脚注、尾注等
[v3.6 - 引入递归解析以处理嵌套内容标签,如 smartTag]
一个基于高级结构化解析的 .docx 文件翻译器。
它能高精度保留样式,并正确处理正文、表格、页眉/脚、脚注/尾注、超链接和目录(TOC)等复杂元素
[v4.1 - 稳定版重构]
- 移除所有对 python-docx 内部类的脆弱导入和猴子补丁。
- 实现了一个健壮的、基于鸭子类型的 _traverse_container 函数来统一处理所有文本容器。
"""
# 包含所有应被解析器完全忽略的、不影响文本内容的元数据标签
IGNORED_TAGS = {
qn('w:proofErr'), # 拼写和语法错误标记
qn('w:lastRenderedPageBreak'), # 上次渲染的分页符位置
qn('w:bookmarkStart'), # 书签开始
qn('w:bookmarkEnd'), # 书签结束
qn('w:commentRangeStart'), # 批注范围开始
qn('w:commentRangeEnd'), # 批注范围结束
qn('w:del'), # 修订:删除
qn('w:ins'), # 修订:插入
qn('w:moveFrom'), # 修订:移动源
qn('w:moveTo'), # 修订:移动目标
qn('w:proofErr'), qn('w:lastRenderedPageBreak'), qn('w:bookmarkStart'),
qn('w:bookmarkEnd'), qn('w:commentRangeStart'), qn('w:commentRangeEnd'),
qn('w:del'), qn('w:ins'), qn('w:moveFrom'), qn('w:moveTo'),
}
# 包含应递归处理其内部内容的容器标签
RECURSIVE_CONTAINER_TAGS = {
qn('w:smartTag'), # 智能标记 (包含文本)
qn('w:sdtContent'), # 结构化文档标签内容 (包含文本)
qn('w:smartTag'), qn('w:sdtContent'), qn('w:hyperlink'),
}
def __init__(self, config: DocxTranslatorConfig):
@@ -84,58 +70,23 @@ class DocxTranslator(AiTranslator):
self.translate_agent = None
if not self.skip_translate:
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
base_url=config.base_url,
api_key=config.api_key,
model_id=config.model_id,
temperature=config.temperature,
thinking=config.thinking,
concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger,
glossary_dict=config.glossary_dict,
retry=config.retry,
custom_prompt=config.custom_prompt, to_lang=config.to_lang, base_url=config.base_url,
api_key=config.api_key, model_id=config.model_id, temperature=config.temperature,
thinking=config.thinking, concurrent=config.concurrent, timeout=config.timeout,
logger=self.logger, glossary_dict=config.glossary_dict, retry=config.retry,
system_proxy_enable=config.system_proxy_enable
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode
self.separator = config.separator
@staticmethod
def _extract_hyperlink_text(hyperlink_element) -> str:
"""从超链接 OXML 元素中提取所有显示文本。"""
return ''.join(
t.text for t in hyperlink_element.findall('.//w:t', namespaces=hyperlink_element.nsmap) if t.text
)
def _is_seq_field(self, child) -> bool:
"""判断一个 OXML 元素是否为 SEQ 域的一部分(如图、表编号)。"""
try:
if child.tag == qn('w:fldSimple'):
instr = child.get(qn('w:instr'), '')
if 'SEQ' in instr: return True
if child.tag == qn('w:r'):
for instr_text in child.findall('.//w:instrText', namespaces=child.nsmap):
if instr_text.text and 'SEQ' in instr_text.text:
return True
except Exception:
pass
return False
def _process_element_children(self, element, elements: List[Dict[str, Any]], texts: List[str],
state: Dict[str, Any]):
"""
[新函数] 递归处理任何给定XML元素的子节点。
'state' 字典用于跨递归调用传递状态,如 current_runs 和 is_inside_field。
"""
current_runs = state['current_runs']
def flush_segment():
nonlocal current_runs
if not current_runs:
return
if not current_runs: return
full_text = "".join(r.text for r in current_runs)
if full_text.strip():
elements.append({"type": "text_runs", "runs": current_runs})
@@ -146,62 +97,38 @@ class DocxTranslator(AiTranslator):
for child in element:
if child.tag in self.IGNORED_TAGS:
continue
if child.tag in self.RECURSIVE_CONTAINER_TAGS:
self._process_element_children(child, elements, texts, state)
continue
if child.tag == qn('w:hyperlink'):
flush_segment()
hyperlink_text = self._extract_hyperlink_text(child)
if hyperlink_text.strip():
elements.append({"type": "hyperlink", "element": child})
texts.append(hyperlink_text)
continue
field_char_element = None
if child.tag == qn('w:fldChar'):
field_char_element = child
elif isinstance(child, CT_R):
field_char_element = child.find(qn('w:fldChar'))
field_char_element = child.find(qn('w:fldChar')) if isinstance(child, CT_R) else (
child if child.tag == qn('w:fldChar') else None)
if field_char_element is not None:
flush_segment()
fld_type = field_char_element.get(qn('w:fldCharType'))
if fld_type == 'begin':
state['is_inside_field'] = True
state['field_depth'] += 1
elif fld_type == 'end':
state['is_inside_field'] = False
state['field_depth'] = max(0, state['field_depth'] - 1)
continue
if state.get('is_inside_field', False):
continue
if self._is_seq_field(child):
flush_segment()
continue
if isinstance(child, CT_R):
if child.find(qn('w:instrText')) is not None:
continue
if state['field_depth'] > 0:
continue
run = Run(child, None)
if is_image_run(run) or run.element.find(qn('w:tab')) is not None or is_formatting_only_run(run):
if is_image_run(run) or is_formatting_only_run(run):
flush_segment()
else:
current_runs.append(run)
else:
flush_segment()
state['current_runs'] = current_runs
def _process_paragraph(self, para: Paragraph, elements: List[Dict[str, Any]], texts: List[str]):
"""
[重构] 作为递归处理器的入口点,初始化状态并调用递归函数。
"""
if not para.text.strip():
return
state = {'current_runs': [], 'is_inside_field': False}
state = {'current_runs': [], 'field_depth': 0}
self._process_element_children(para._p, elements, texts, state)
current_runs = state['current_runs']
if current_runs:
full_text = "".join(r.text for r in current_runs)
@@ -209,90 +136,78 @@ class DocxTranslator(AiTranslator):
elements.append({"type": "text_runs", "runs": current_runs})
texts.append(full_text)
def _process_container(self, container, elements: List[Dict[str, Any]], texts: List[str]):
"""递归处理包含段落和表格的容器(如文档、单元格、页眉)。"""
if not container: return
for para in getattr(container, 'paragraphs', []):
self._process_paragraph(para, elements, texts)
for table in getattr(container, 'tables', []):
def _traverse_container(self, container, elements: List[Dict[str, Any]], texts: List[str]):
"""
[核心导航员] 健壮地遍历任何文本容器 (Document, _Cell, _Header, etc.)。
"""
if container is None:
return
# --- 关键修复 ---
# 通过检查属性来确定如何获取子元素,而不是依赖于对象的具体类型。
# 这种方法对所有类型的容器都有效。
parent_element = None
if hasattr(container, 'element') and hasattr(container.element, 'body'):
parent_element = container.element.body # For Document
elif hasattr(container, '_element'):
parent_element = container._element # For _Cell, _Header, _Footer, NotesParts
if parent_element is None:
return
for child_element in parent_element:
if child_element.tag.endswith('p'):
self._process_paragraph(Paragraph(child_element, container), elements, texts)
elif child_element.tag.endswith('tbl'):
table = Table(child_element, container)
for row in table.rows:
for cell in row.cells:
self._process_container(cell, elements, texts)
def _process_part(self, doc_part, elements: List[Dict[str, Any]], texts: List[str]):
"""处理文档的非主内容部分,如脚注、尾注。"""
if not doc_part: return
for para_element in doc_part.element.findall('.//w:p', namespaces=doc_part.element.nsmap):
try:
self._process_paragraph(Paragraph(para_element, doc_part), elements, texts)
except Exception as e:
self.logger.warning(f"处理文档部件段落时出错: {e}")
self._traverse_container(cell, elements, texts)
def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]:
doc = docx.Document(BytesIO(document.content))
elements, texts = [], []
# 1. 处理主文档内容
self._process_container(doc, elements, texts)
self._traverse_container(doc, elements, texts)
# 2. 处理所有类型的页眉和页脚
# 2. 处理所有的页眉和页脚
for section in doc.sections:
self._process_container(section.header, elements, texts)
self._process_container(section.first_page_header, elements, texts)
self._process_container(section.even_page_header, elements, texts)
self._process_container(section.footer, elements, texts)
self._process_container(section.first_page_footer, elements, texts)
self._process_container(section.even_page_footer, elements, texts)
self._traverse_container(section.header, elements, texts)
self._traverse_container(section.first_page_header, elements, texts)
self._traverse_container(section.even_page_header, elements, texts)
self._traverse_container(section.footer, elements, texts)
self._traverse_container(section.first_page_footer, elements, texts)
self._traverse_container(section.even_page_footer, elements, texts)
# 3. 处理脚注尾注
if part := getattr(doc.part, 'footnotes_part', None): self._process_part(part, elements, texts)
if part := getattr(doc.part, 'endnotes_part', None): self._process_part(part, elements, texts)
# 3. 处理脚注尾注
if hasattr(doc.part, 'footnotes_part') and doc.part.footnotes_part is not None:
self._traverse_container(doc.part.footnotes_part, elements, texts)
if hasattr(doc.part, 'endnotes_part') and doc.part.endnotes_part is not None:
self._traverse_container(doc.part.endnotes_part, elements, texts)
return doc, elements, texts
def _apply_translation(self, element_info: Dict[str, Any], final_text: str):
"""
将翻译后的文本写回对应的 OXML 元素。
对于多Run的文本段写入第一个Run并【删除】其余以避免产生方框占位符。
"""
el_type = element_info["type"]
if el_type == "text_runs":
if element_info["type"] == "text_runs":
runs = element_info["runs"]
if not runs:
return
if not runs: return
runs[0].text = final_text
for run in runs[1:]:
p_element = run.element.getparent()
if p_element is not None:
p_element.remove(run.element)
elif el_type == "hyperlink":
hyperlink = element_info["element"]
r_elements = hyperlink.findall(f'.//{qn("w:r")}')
if r_elements:
first_r = r_elements[0]
for t in first_r.findall(f'.//{qn("w:t")}'):
first_r.remove(t)
new_t = OxmlElement('w:t')
new_t.text = final_text
new_t.set(qn('xml:space'), 'preserve')
first_r.append(new_t)
for other_r in r_elements[1:]:
if (parent := other_r.getparent()) is not None:
parent.remove(other_r)
parent_element = run.element.getparent()
if parent_element is not None:
try:
parent_element.remove(run.element)
except ValueError:
pass
def _after_translate(self, doc: DocumentObject, elements: List[Dict[str, Any]], translated: List[str],
originals: List[str]) -> bytes:
if len(elements) != len(translated):
self.logger.error(
f"Translation count mismatch! Originals: {len(originals)}, Translated: {len(translated)}. Processing common part only.")
f"翻译数量不匹配!原文: {len(originals)}, 译文: {len(translated)}. 将只处理公共部分。")
min_len = min(len(elements), len(translated), len(originals))
elements, translated, originals = elements[:min_len], translated[:min_len], originals[:min_len]
for info, orig, trans in zip(elements, originals, translated):
if self.insert_mode == "replace":
final_text = trans
@@ -303,7 +218,6 @@ class DocxTranslator(AiTranslator):
else:
final_text = trans
self._apply_translation(info, final_text)
doc_output_stream = BytesIO()
doc.save(doc_output_stream)
return doc_output_stream.getvalue()
@@ -311,35 +225,29 @@ class DocxTranslator(AiTranslator):
def translate(self, document: Document) -> Self:
doc, elements, originals = self._pre_translate(document)
if not originals:
self.logger.info("\nNo translatable text content found in the document.")
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = self._after_translate(doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = self.translate_agent.send_segments(originals,
self.chunk_size) if self.translate_agent else originals
document.content = self._after_translate(doc, elements, translated, originals)
return self
async def translate_async(self, document: Document) -> Self:
doc, elements, originals = await asyncio.to_thread(self._pre_translate, document)
if not originals:
self.logger.info("\nNo translatable text content found in the document.")
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = await asyncio.to_thread(self._after_translate, doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = await self.translate_agent.send_segments_async(originals,
self.chunk_size) if self.translate_agent else originals
document.content = await asyncio.to_thread(self._after_translate, doc, elements, translated, originals)
return self