优化docx

This commit is contained in:
xunbu
2025-10-14 18:55:38 +08:00
parent 0e4f3dba61
commit b6916da71c

View File

@@ -25,6 +25,24 @@ def is_image_run(run: Run) -> bool:
return '<w:drawing' in xml or '<w:pict' in xml
def is_formatting_only_run(run: Run) -> bool:
"""
检查一个 Run 是否主要用于格式化,例如一个空的粗体/斜体/下划线 Run。
"""
text = run.text
if not text.strip():
# Handles empty runs with formatting
if run.underline or run.bold or run.italic or run.font.strike or run.font.subscript or run.font.superscript:
return True
# Handles runs that are just whitespace but have formatting that might be visually significant
if text and run.underline:
return True
# A simple tab run is also considered formatting-only for our purpose
if text == '\t':
return True
return False
# ---------------- 配置类 ----------------
@dataclass
class DocxTranslatorConfig(AiTranslatorConfig):
@@ -38,12 +56,15 @@ class DocxTranslator(AiTranslator):
一个基于高级结构化解析的 .docx 文件翻译器。
它能高精度保留样式,并正确处理正文、表格、页眉/脚、脚注/尾注、超链接和目录(TOC)等复杂元素。
[v6.1 - 笔误修复版]
- 修复了 qn() 函数调用中的一个笔误 ('w:w:fldCharType'),该错误会导致程序崩溃
- 实现了基于“有效视觉样式”的智能分段逻辑。
- 修复了对域代码Fields结果文本的错误跳过问题
- 新增了对结构化文档标签SDT的递归解析。
- 修复了因页眉/页脚对象共享导致文本被重复提取和翻译的问题。
[v4.2 - 修复版]
- 修复了对域代码Fields结果文本的错误跳过问题确保目录条目可被翻译
- 新增了对结构化文档标签Structured Document Tags, SDT的递归解析
确保由内容控件(如自动目录)包裹的内容可以被正确处理
[v5.0 - 增强版]
- 引入了智能域处理状态机,精确识别并跳过 PAGEREF (页码) 和 SEQ (序号) 等不应翻译的动态域内容。
- 优化了文本切分逻辑,解决了目录(TOC)和图表目录(TOF)条目被错误拆分为“标题”和“页码”两部分的问题。
- 根除了因复杂域处理不当导致的目录项重复翻译问题,确保每个条目只被提取和翻译一次。
"""
IGNORED_TAGS = {
qn('w:proofErr'), qn('w:lastRenderedPageBreak'), qn('w:bookmarkStart'),
@@ -53,6 +74,8 @@ class DocxTranslator(AiTranslator):
RECURSIVE_CONTAINER_TAGS = {
qn('w:smartTag'), qn('w:sdtContent'), qn('w:hyperlink'),
}
# [v5.0] 定义不应翻译其结果的域指令
SKIPPABLE_FIELD_INSTRUCTIONS = {'PAGEREF', 'SEQ'}
def __init__(self, config: DocxTranslatorConfig):
super().__init__(config=config)
@@ -70,97 +93,99 @@ class DocxTranslator(AiTranslator):
self.insert_mode = config.insert_mode
self.separator = config.separator
def _get_run_style_signature(self, run: Run) -> tuple:
"""
获取 Run 的“有效视觉样式”签名。
通过比较 python-docx 计算后的最终属性(如字体、大小、颜色等),
可以正确处理样式继承,比直接比较 XML 更健壮。
"""
f = run.font
return (
f.name,
f.size,
f.bold,
f.italic,
f.underline,
f.strike,
f.all_caps,
f.small_caps,
f.color.rgb if f.color and f.color.rgb is not None else None,
f.highlight_color,
)
def _process_element_children(self, element, elements: List[Dict[str, Any]], texts: List[str],
state: Dict[str, Any]):
current_runs = state['current_runs']
def flush_segment():
nonlocal current_runs
if not current_runs: return
current_runs = state['current_runs']
if not current_runs:
return
full_text = "".join(r.text for r in current_runs)
if full_text.strip():
elements.append({"type": "text_runs", "runs": current_runs})
elements.append({"type": "text_runs", "runs": list(current_runs)})
texts.append(full_text)
current_runs = []
state['current_runs'] = current_runs
state['current_runs'].clear()
for child in element:
if child.tag in self.IGNORED_TAGS:
continue
if child.tag in self.RECURSIVE_CONTAINER_TAGS:
# [v5.0] 递归前刷新,确保容器前的内容已保存
flush_segment()
self._process_element_children(child, elements, texts, state)
continue
# --- [v5.0] 智能域处理逻辑 ---
# 检查是否为域指令文本 (instrText)
instr_text_element = child.find(qn('w:instrText')) if isinstance(child, CT_R) else None
if instr_text_element is not None:
instr_text = instr_text_element.text.strip()
# 检查指令是否属于需要跳过的类型
if any(keyword in instr_text for keyword in self.SKIPPABLE_FIELD_INSTRUCTIONS):
state['is_in_skippable_field'] = True
continue # 无论如何都跳过指令文本本身的处理
# 检查是否为域字符 (fldChar)
field_char_element = child.find(qn('w:fldChar')) if isinstance(child, CT_R) else (
child if child.tag == qn('w:fldChar') else None)
if field_char_element is not None:
flush_segment()
# 【笔误修复】: 修正了 'w:w:fldCharType' 为 'w:fldCharType'
fld_type = field_char_element.get(qn('w:fldCharType'))
if fld_type == 'begin':
state['field_depth'] += 1
flush_segment()
# 重置子域的状态
state['is_in_skippable_field'] = False
state['is_skipping_result'] = False
elif fld_type == 'separate':
# 如果这是一个我们标记为要跳过的域,现在开始跳过其结果
if state.get('is_in_skippable_field'):
flush_segment() # 刷新域之前的所有文本 (如目录标题)
state['is_skipping_result'] = True
elif fld_type == 'end':
state['field_depth'] = max(0, state['field_depth'] - 1)
# 域结束,恢复正常处理
state['is_in_skippable_field'] = False
state['is_skipping_result'] = False
continue
# 如果当前状态是跳过域结果,则忽略这个子元素
if state.get('is_skipping_result'):
continue
# --- 域处理逻辑结束 ---
if isinstance(child, CT_R):
if child.find(qn('w:instrText')) is not None:
continue
run = Run(child, None)
if is_image_run(run):
if is_image_run(run) or is_formatting_only_run(run):
flush_segment()
continue
if not run.text:
continue
current_run_style_sig = self._get_run_style_signature(run)
if not current_runs:
current_runs.append(run)
else:
last_run_style_sig = self._get_run_style_signature(current_runs[-1])
if current_run_style_sig == last_run_style_sig:
current_runs.append(run)
state['current_runs'].append(run)
else:
flush_segment()
current_runs.append(run)
else:
flush_segment()
state['current_runs'] = current_runs
# 在元素处理结束后,确保最后一部分也被刷新
# [v5.0] 此处的刷新由调用者 (_process_paragraph) 控制,以避免递归中过早刷新
def _process_paragraph(self, para: Paragraph, elements: List[Dict[str, Any]], texts: List[str]):
if not para.text.strip():
return
state = {'current_runs': [], 'field_depth': 0}
# [v5.0] 为每个段落初始化独立的状态
state = {
'current_runs': [],
'is_in_skippable_field': False, # 是否在PAGEREF等域的指令部分
'is_skipping_result': False # 是否正在跳过PAGEREF等域的结果部分
}
self._process_element_children(para._p, elements, texts, state)
# [v5.0] 处理完一个段落的所有子元素后刷新剩余的runs
current_runs = state['current_runs']
if current_runs:
full_text = "".join(r.text for r in current_runs)
if full_text.strip():
elements.append({"type": "text_runs", "runs": current_runs})
elements.append({"type": "text_runs", "runs": list(current_runs)})
texts.append(full_text)
current_runs.clear()
def _process_body_elements(self, parent_element, container, elements: List[Dict[str, Any]], texts: List[str]):
""" 遍历一个容器内的所有顶级元素(段落、表格、内容控件等) """
@@ -196,32 +221,24 @@ class DocxTranslator(AiTranslator):
def _pre_translate(self, document: Document) -> Tuple[DocumentObject, List[Dict[str, Any]], List[str]]:
doc = docx.Document(BytesIO(document.content))
elements, texts = [], []
processed_container_ids = set()
def process_once(container):
"""一个辅助函数,确保每个容器只被处理一次"""
if container is None or id(container) in processed_container_ids:
return
processed_container_ids.add(id(container))
self._traverse_container(container, elements, texts)
# 1. 处理主文档内容
process_once(doc)
self._traverse_container(doc, elements, texts)
# 2. 处理所有节的页眉和页脚
for section in doc.sections:
process_once(section.header)
process_once(section.first_page_header)
process_once(section.even_page_header)
process_once(section.footer)
process_once(section.first_page_footer)
process_once(section.even_page_footer)
self._traverse_container(section.header, elements, texts)
self._traverse_container(section.first_page_header, elements, texts)
self._traverse_container(section.even_page_header, elements, texts)
self._traverse_container(section.footer, elements, texts)
self._traverse_container(section.first_page_footer, elements, texts)
self._traverse_container(section.even_page_footer, elements, texts)
# 3. 处理脚注和尾注
if hasattr(doc.part, 'footnotes_part') and doc.part.footnotes_part is not None:
process_once(doc.part.footnotes_part)
self._traverse_container(doc.part.footnotes_part, elements, texts)
if hasattr(doc.part, 'endnotes_part') and doc.part.endnotes_part is not None:
process_once(doc.part.endnotes_part)
self._traverse_container(doc.part.endnotes_part, elements, texts)
return doc, elements, texts
@@ -229,13 +246,30 @@ class DocxTranslator(AiTranslator):
if element_info["type"] == "text_runs":
runs = element_info["runs"]
if not runs: return
runs[0].text = final_text
for run in runs[1:]:
# [v5.0] 改进合并逻辑更稳健地处理空runs
first_real_run_index = -1
# 将翻译文本赋给第一个有效的run
for i, run in enumerate(runs):
if run.element.getparent() is not None:
run.text = final_text
first_real_run_index = i
break
if first_real_run_index == -1:
self.logger.warning(f"无法应用翻译 '{final_text}'因为找不到有效的run。")
return
# 删除其余的runs
for i in range(first_real_run_index + 1, len(runs)):
run = runs[i]
parent_element = run.element.getparent()
if parent_element is not None:
try:
parent_element.remove(run.element)
except ValueError:
# 在某些复杂情况下,元素可能已被其父元素的其他操作移除
self.logger.debug(f"尝试删除一个不存在的run元素。这通常是安全的。")
pass
def _after_translate(self, doc: DocumentObject, elements: List[Dict[str, Any]], translated: List[str],
@@ -245,6 +279,7 @@ class DocxTranslator(AiTranslator):
f"翻译数量不匹配!原文: {len(originals)}, 译文: {len(translated)}. 将只处理公共部分。")
min_len = min(len(elements), len(translated), len(originals))
elements, translated, originals = elements[:min_len], translated[:min_len], originals[:min_len]
for info, orig, trans in zip(elements, originals, translated):
if self.insert_mode == "replace":
final_text = trans
@@ -255,6 +290,7 @@ class DocxTranslator(AiTranslator):
else:
final_text = trans
self._apply_translation(info, final_text)
doc_output_stream = BytesIO()
doc.save(doc_output_stream)
return doc_output_stream.getvalue()
@@ -265,10 +301,12 @@ class DocxTranslator(AiTranslator):
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = self._after_translate(doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = self.translate_agent.send_segments(originals,
self.chunk_size) if self.translate_agent else originals
document.content = self._after_translate(doc, elements, translated, originals)
@@ -276,14 +314,17 @@ class DocxTranslator(AiTranslator):
async def translate_async(self, document: Document) -> Self:
doc, elements, originals = await asyncio.to_thread(self._pre_translate, document)
# print(f"【测试】originals\n:{originals}") # 保持您的测试输出
if not originals:
self.logger.info("\n文档中未找到可翻译的文本内容。")
document.content = await asyncio.to_thread(self._after_translate, doc, elements, [], [])
return self
if self.glossary_agent:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(originals, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
translated = await self.translate_agent.send_segments_async(originals,
self.chunk_size) if self.translate_agent else originals
document.content = await asyncio.to_thread(self._after_translate, doc, elements, translated, originals)