修复区域翻译功能

This commit is contained in:
xunbu
2025-12-07 20:02:46 +08:00
parent 0d80eaac3c
commit 99c5d2c7c7

View File

@@ -3,12 +3,13 @@
import asyncio import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from io import BytesIO from io import BytesIO
from typing import Self, Literal, List, Optional from typing import Self, Literal, List, Optional, Dict, Tuple, Set
import zipfile import zipfile
import re # 引入正则用于解析简写
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import openpyxl # openpyxl 仍然保留,以备将来可能需要混合模式或用于其他目的 # 仅导入 openpyxl 的工具函数用于坐标计算,不加载 workbook 对象
from openpyxl.cell import Cell from openpyxl.utils.cell import coordinate_to_tuple, range_boundaries
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document from docutranslate.ir.document import Document
@@ -20,8 +21,8 @@ class XlsxTranslatorConfig(AiTranslatorConfig):
insert_mode: Literal["replace", "append", "prepend"] = "replace" insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n" separator: str = "\n"
# 指定翻译区域列表。 # 指定翻译区域列表。
# 示例: ["Sheet1!A1:B10", "C:D", "E5"] # 示例: ["Sheet1!A1:B10", "C", "3"] (支持简写: C代表C列, 3代表第3行)
# 如果不指定表名 (如 "C:D"),则应用于所有表。 # 如果不指定表名 (如 "C"),则应用于所有表。
# 如果为 None 或空列表,则翻译整个文件中的所有文本。 # 如果为 None 或空列表,则翻译整个文件中的所有文本。
translate_regions: Optional[List[str]] = None translate_regions: Optional[List[str]] = None
@@ -53,113 +54,332 @@ class XlsxTranslator(AiTranslator):
self.separator = config.separator self.separator = config.separator
self.translate_regions = config.translate_regions self.translate_regions = config.translate_regions
def _get_texts_to_translate(self, document: Document) -> List[str]: # 命名空间定义
""" self.ns = {
【已修改】通过直接解析内部XML文件来识别需要翻译的文本。 'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main',
这种方法可以正确处理包含富文本的单元格,并确保与重建逻辑一致,但不支持按区域翻译。 'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'
""" }
if self.translate_regions: ET.register_namespace('', self.ns['main'])
self.logger.warning("当前文本提取方法直接解析XML不支持 'translate_regions'。将翻译文件中的所有文本内容。")
texts_to_translate = set() # =========================================================================
ns = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} # 辅助方法:无需加载 Workbook 即可解析结构
# =========================================================================
def _get_shared_strings(self, zf: zipfile.ZipFile) -> List[str]:
"""解析共享字符串表,返回字符串列表。"""
if "xl/sharedStrings.xml" not in zf.namelist():
return []
shared_strings = []
with zf.open("xl/sharedStrings.xml") as f:
context = ET.iterparse(f, events=("end",))
for event, elem in context:
if elem.tag.endswith('}si'): # shared item
texts = [t.text for t in elem.findall('.//main:t', self.ns) if t.text]
shared_strings.append("".join(texts))
elem.clear()
return shared_strings
def _get_sheet_mapping(self, zf: zipfile.ZipFile) -> Dict[str, str]:
"""
获取 Sheet 名称到文件路径的映射。
例如: {'Sheet1': 'xl/worksheets/sheet1.xml'}
"""
sheet_name_to_rid = {}
try: try:
with zipfile.ZipFile(BytesIO(document.content), 'r') as original_zip: with zf.open("xl/workbook.xml") as f:
# --- 1. 处理共享字符串 (sharedStrings.xml) --- root = ET.fromstring(f.read())
# 这是所有文本(包括富文本片段)的主要存储位置。 for sheet in root.findall(".//main:sheet", self.ns):
if "xl/sharedStrings.xml" in original_zip.namelist(): name = sheet.get("name")
with original_zip.open("xl/sharedStrings.xml") as f: rid = sheet.get(f"{{{self.ns['r']}}}id")
root = ET.fromstring(f.read()) if name and rid:
# 查找所有 <t> 元素,无论它们在哪个层级,这能正确捕获富文本片段 sheet_name_to_rid[name] = rid
text_nodes = root.findall('.//main:t', ns) except Exception:
for node in text_nodes: return {}
# 确保节点有文本内容且不是纯粹的空白
if node.text and node.text.strip():
texts_to_translate.add(node.text)
# --- 2. 处理表格标题 (tableX.xml) --- rid_to_target = {}
# 表格的列名不存储在 sharedStrings.xml 中,需要单独处理。 try:
for item in original_zip.infolist(): with zf.open("xl/_rels/workbook.xml.rels") as f:
if item.filename.startswith("xl/tables/table"): tree = ET.parse(f)
with original_zip.open(item.filename) as f: root = tree.getroot()
root = ET.fromstring(f.read()) for child in root:
table_columns = root.findall('.//main:tableColumn', ns) rid = child.get("Id")
for col in table_columns: target = child.get("Target")
original_name = col.get('name') if rid and target:
if original_name and original_name.strip(): if not target.startswith("/"):
texts_to_translate.add(original_name) target = "xl/" + target
rid_to_target[rid] = target
except Exception:
return {}
except Exception as e: mapping = {}
self.logger.error(f"直接解析XLSX的XML文件失败: {e}") for name, rid in sheet_name_to_rid.items():
if rid in rid_to_target:
mapping[name] = rid_to_target[rid]
return mapping
def _normalize_range(self, range_str: str) -> str:
"""
将用户输入的简写转换为标准范围格式。
"C" -> "C:C"
"3" -> "3:3"
"A1" -> "A1:A1" (openpyxl range_boundaries 实际上支持 A1但这里统一处理更安全)
"""
# 纯字母 (例如 "C", "AA") -> 整列
if re.match(r"^[A-Za-z]+$", range_str):
return f"{range_str}:{range_str}"
# 纯数字 (例如 "3", "10") -> 整行
if re.match(r"^\d+$", range_str):
return f"{range_str}:{range_str}"
return range_str
def _parse_region_boundaries(self, sheet_mapping: Dict[str, str]) -> Dict[str, List[Tuple]]:
"""
解析配置的 translate_regions。
返回: { 'xl/worksheets/sheet1.xml': [(min_col, min_row, max_col, max_row), ...], ... }
"""
if not self.translate_regions:
return {}
region_map = {} # filename -> list of boundaries
global_regions = [] # list of boundaries for all sheets
for region in self.translate_regions:
sheet_name = None
raw_range = region.strip()
if "!" in raw_range:
parts = raw_range.split("!", 1)
sheet_name = parts[0].strip("'")
range_part = self._normalize_range(parts[1])
else:
range_part = self._normalize_range(raw_range)
try:
# boundaries: (min_col, min_row, max_col, max_row)
boundaries = range_boundaries(range_part)
if sheet_name:
filename = sheet_mapping.get(sheet_name)
if filename:
if filename not in region_map:
region_map[filename] = []
region_map[filename].append(boundaries)
else:
global_regions.append(boundaries)
except Exception as e:
self.logger.warning(f"无法解析区域 '{region}': {e}")
# 将全局区域添加到所有已知 Sheet
if global_regions:
all_files = set(sheet_mapping.values())
for f in all_files:
if f not in region_map:
region_map[f] = []
region_map[f].extend(global_regions)
return region_map
def _is_in_boundaries(self, col: int, row: int, boundaries_list: List[Tuple]) -> bool:
"""检查坐标 (col, row) 是否在给定的边界列表中。"""
for (min_col, min_row, max_col, max_row) in boundaries_list:
if min_col is not None and col < min_col: continue
if min_row is not None and row < min_row: continue
if max_col is not None and col > max_col: continue
if max_row is not None and row > max_row: continue
return True
return False
# =========================================================================
# 高效 XML 区域提取与重构
# =========================================================================
def _get_texts_xml_regions(self, document: Document) -> List[str]:
"""使用纯 XML 解析(结合 SharedStrings提取指定区域文本。"""
texts_to_translate = set()
with zipfile.ZipFile(BytesIO(document.content), 'r') as zf:
shared_strings = self._get_shared_strings(zf)
if not shared_strings:
return []
sheet_mapping = self._get_sheet_mapping(zf)
boundaries_map = self._parse_region_boundaries(sheet_mapping)
for filename, boundaries in boundaries_map.items():
if filename not in zf.namelist():
continue
with zf.open(filename) as f:
context = ET.iterparse(f, events=("end",))
for event, elem in context:
if elem.tag.endswith('}c'): # Cell
r_attr = elem.get('r') # e.g. "C5"
t_attr = elem.get('t') # e.g. "s"
if r_attr and t_attr == 's':
try:
# 【修正】coordinate_to_tuple 返回 (row, col)
row, col = coordinate_to_tuple(r_attr)
if self._is_in_boundaries(col, row, boundaries):
v_node = elem.find('main:v', self.ns)
if v_node is not None and v_node.text:
idx = int(v_node.text)
if 0 <= idx < len(shared_strings):
texts_to_translate.add(shared_strings[idx])
except Exception:
pass
elem.clear()
return list(texts_to_translate) return list(texts_to_translate)
def _rebuild_xlsx_with_translated_content(self, original_content_bytes: bytes, translation_map: dict) -> bytes: def _rebuild_xml_regions(self, original_content_bytes: bytes, translation_map: dict) -> bytes:
""" """使用纯 XML 重构,修正了坐标解包顺序。"""
【无需修改】通过替换 sharedStrings.xml 和 tableX.xml 中的文本内容来重构 XLSX 文件。
此函数的逻辑与新的读取逻辑完全匹配。
"""
# 注册命名空间以正确解析和生成XML
ns = {
'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'
}
ET.register_namespace('', ns['main'])
original_zip_io = BytesIO(original_content_bytes)
output_zip_io = BytesIO() output_zip_io = BytesIO()
with zipfile.ZipFile(BytesIO(original_content_bytes), 'r') as zf_in:
with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as zf_out:
shared_strings = self._get_shared_strings(zf_in)
sheet_mapping = self._get_sheet_mapping(zf_in)
boundaries_map = self._parse_region_boundaries(sheet_mapping)
for item in zf_in.infolist():
if item.filename in boundaries_map:
boundaries = boundaries_map[item.filename]
with zf_in.open(item.filename) as f:
tree = ET.parse(f)
root = tree.getroot()
cells_modified = False
for cell in root.findall(".//main:c", self.ns):
r_attr = cell.get('r')
t_attr = cell.get('t')
if r_attr and t_attr == 's':
try:
# 【修正】coordinate_to_tuple 返回 (row, col)
row, col = coordinate_to_tuple(r_attr)
if self._is_in_boundaries(col, row, boundaries):
v_node = cell.find('main:v', self.ns)
if v_node is not None and v_node.text:
idx = int(v_node.text)
if 0 <= idx < len(shared_strings):
original_text = shared_strings[idx]
if original_text in translation_map:
translated_text = translation_map[original_text]
final_text = translated_text
if self.insert_mode == "append":
final_text = original_text + self.separator + translated_text
elif self.insert_mode == "prepend":
final_text = translated_text + self.separator + original_text
# 转换为 inlineStr
cell.set('t', 'inlineStr')
cell.remove(v_node)
is_node = ET.Element(f"{{{self.ns['main']}}}is")
t_node = ET.SubElement(is_node, f"{{{self.ns['main']}}}t")
t_node.text = final_text
cell.append(is_node)
cells_modified = True
except Exception:
pass
if cells_modified:
xml_str = ET.tostring(root, encoding='utf-8', xml_declaration=True)
zf_out.writestr(item, xml_str)
else:
zf_out.writestr(item, zf_in.read(item.filename))
else:
zf_out.writestr(item, zf_in.read(item.filename))
return output_zip_io.getvalue()
# =========================================================================
# 原有全文档逻辑 (针对全文档翻译保持极致速度)
# =========================================================================
def _get_texts_xml_all(self, document: Document) -> List[str]:
texts_to_translate = set()
try: try:
with zipfile.ZipFile(original_zip_io, 'r') as original_zip: with zipfile.ZipFile(BytesIO(document.content), 'r') as zf:
with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as output_zip: if "xl/sharedStrings.xml" in zf.namelist():
for item in original_zip.infolist(): with zf.open("xl/sharedStrings.xml") as f:
file_content = original_zip.read(item.filename) root = ET.fromstring(f.read())
for node in root.findall('.//main:t', self.ns):
if node.text and node.text.strip():
texts_to_translate.add(node.text)
for item in zf.infolist():
if item.filename.startswith("xl/tables/table"):
with zf.open(item.filename) as f:
root = ET.fromstring(f.read())
for col in root.findall('.//main:tableColumn', self.ns):
if col.get('name'):
texts_to_translate.add(col.get('name'))
except Exception as e:
self.logger.error(f"XML解析失败: {e}")
return list(texts_to_translate)
def _rebuild_xml_all(self, original_content_bytes: bytes, translation_map: dict) -> bytes:
output_zip_io = BytesIO()
try:
with zipfile.ZipFile(BytesIO(original_content_bytes), 'r') as zf_in:
with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as zf_out:
for item in zf_in.infolist():
content = zf_in.read(item.filename)
# --- 1. 处理共享字符串文件 ---
if item.filename == "xl/sharedStrings.xml": if item.filename == "xl/sharedStrings.xml":
root = ET.fromstring(file_content) root = ET.fromstring(content)
text_nodes = root.findall('.//main:t', ns) for node in root.findall('.//main:t', self.ns):
for node in text_nodes: if node.text in translation_map:
original_text = node.text trans = translation_map[node.text]
if original_text in translation_map: if self.insert_mode == "append":
translated_text = translation_map[original_text] node.text = node.text + self.separator + trans
if self.insert_mode == "replace":
node.text = translated_text
elif self.insert_mode == "append":
node.text = original_text + self.separator + translated_text
elif self.insert_mode == "prepend": elif self.insert_mode == "prepend":
node.text = translated_text + self.separator + original_text node.text = trans + self.separator + node.text
file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True) else:
node.text = trans
content = ET.tostring(root, encoding='utf-8', xml_declaration=True)
# --- 2. 处理表格定义文件 ---
elif item.filename.startswith("xl/tables/table"): elif item.filename.startswith("xl/tables/table"):
root = ET.fromstring(file_content) root = ET.fromstring(content)
table_columns = root.findall('.//main:tableColumn', ns) for col in root.findall('.//main:tableColumn', self.ns):
for col in table_columns: orig = col.get('name')
original_name = col.get('name') if orig in translation_map:
if original_name in translation_map: trans = translation_map[orig]
translated_name = translation_map[original_name] if self.insert_mode == "append":
if self.insert_mode == "replace": col.set('name', orig + self.separator + trans)
col.set('name', translated_name)
elif self.insert_mode == "append":
col.set('name', original_name + self.separator + translated_name)
elif self.insert_mode == "prepend": elif self.insert_mode == "prepend":
col.set('name', translated_name + self.separator + original_name) col.set('name', trans + self.separator + orig)
file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True) else:
col.set('name', trans)
output_zip.writestr(item, file_content) content = ET.tostring(root, encoding='utf-8', xml_declaration=True)
zf_out.writestr(item, content)
return output_zip_io.getvalue() return output_zip_io.getvalue()
except Exception as e:
except (zipfile.BadZipFile, ET.ParseError) as e: self.logger.error(f"XML重构失败: {e}")
self.logger.error(f"处理XLSX文件失败: {e}. 返回原始文件。")
return original_content_bytes return original_content_bytes
# =========================================================================
# 主入口
# =========================================================================
def translate(self, document: Document) -> Self: def translate(self, document: Document) -> Self:
original_texts = self._get_texts_to_translate(document) if self.translate_regions:
original_texts = self._get_texts_xml_regions(document)
else:
original_texts = self._get_texts_xml_all(document)
if not original_texts: if not original_texts:
print("\n在文件中没有找到需要翻译的文本内容。") print(f"\n找到需要翻译的文本 (模式: {'区域' if self.translate_regions else '全文档'}).")
return self return self
if self.glossary_agent: if self.glossary_agent:
@@ -174,15 +394,21 @@ class XlsxTranslator(AiTranslator):
translation_map = dict(zip(original_texts, translated_texts)) translation_map = dict(zip(original_texts, translated_texts))
document.content = self._rebuild_xlsx_with_translated_content(document.content, translation_map) if self.translate_regions:
document.content = self._rebuild_xml_regions(document.content, translation_map)
else:
document.content = self._rebuild_xml_all(document.content, translation_map)
return self return self
async def translate_async(self, document: Document) -> Self: async def translate_async(self, document: Document) -> Self:
original_texts = await asyncio.to_thread(self._get_texts_to_translate, document) if self.translate_regions:
original_texts = await asyncio.to_thread(self._get_texts_xml_regions, document)
else:
original_texts = await asyncio.to_thread(self._get_texts_xml_all, document)
if not original_texts: if not original_texts:
print("\n在文件中没有找到需要翻译的文本内容。") print(f"\n找到需要翻译的文本 (模式: {'区域' if self.translate_regions else '全文档'}).")
return self return self
if self.glossary_agent: if self.glossary_agent:
@@ -197,7 +423,9 @@ class XlsxTranslator(AiTranslator):
translation_map = dict(zip(original_texts, translated_texts)) translation_map = dict(zip(original_texts, translated_texts))
document.content = await asyncio.to_thread(self._rebuild_xlsx_with_translated_content, document.content, if self.translate_regions:
translation_map) document.content = await asyncio.to_thread(self._rebuild_xml_regions, document.content, translation_map)
else:
document.content = await asyncio.to_thread(self._rebuild_xml_all, document.content, translation_map)
return self return self