diff --git a/docutranslate/translator/ai_translator/xlsx_translator.py b/docutranslate/translator/ai_translator/xlsx_translator.py index 8aeb160..a9d2705 100644 --- a/docutranslate/translator/ai_translator/xlsx_translator.py +++ b/docutranslate/translator/ai_translator/xlsx_translator.py @@ -3,12 +3,13 @@ import asyncio from dataclasses import dataclass from io import BytesIO -from typing import Self, Literal, List, Optional +from typing import Self, Literal, List, Optional, Dict, Tuple, Set import zipfile +import re # 引入正则用于解析简写 import xml.etree.ElementTree as ET -import openpyxl # openpyxl 仍然保留,以备将来可能需要混合模式或用于其他目的 -from openpyxl.cell import Cell +# 仅导入 openpyxl 的工具函数用于坐标计算,不加载 workbook 对象 +from openpyxl.utils.cell import coordinate_to_tuple, range_boundaries from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent from docutranslate.ir.document import Document @@ -20,8 +21,8 @@ class XlsxTranslatorConfig(AiTranslatorConfig): insert_mode: Literal["replace", "append", "prepend"] = "replace" separator: str = "\n" # 指定翻译区域列表。 - # 示例: ["Sheet1!A1:B10", "C:D", "E5"] - # 如果不指定表名 (如 "C:D"),则应用于所有表。 + # 示例: ["Sheet1!A1:B10", "C", "3"] (支持简写: C代表C列, 3代表第3行) + # 如果不指定表名 (如 "C"),则应用于所有表。 # 如果为 None 或空列表,则翻译整个文件中的所有文本。 translate_regions: Optional[List[str]] = None @@ -53,113 +54,332 @@ class XlsxTranslator(AiTranslator): self.separator = config.separator self.translate_regions = config.translate_regions - def _get_texts_to_translate(self, document: Document) -> List[str]: - """ - 【已修改】通过直接解析内部XML文件来识别需要翻译的文本。 - 这种方法可以正确处理包含富文本的单元格,并确保与重建逻辑一致,但不支持按区域翻译。 - """ - if self.translate_regions: - self.logger.warning("当前文本提取方法直接解析XML,不支持 'translate_regions'。将翻译文件中的所有文本内容。") + # 命名空间定义 + self.ns = { + 'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main', + 'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships' + } + ET.register_namespace('', self.ns['main']) - texts_to_translate = set() - ns = {'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} + # ========================================================================= + # 辅助方法:无需加载 Workbook 即可解析结构 + # ========================================================================= + def _get_shared_strings(self, zf: zipfile.ZipFile) -> List[str]: + """解析共享字符串表,返回字符串列表。""" + if "xl/sharedStrings.xml" not in zf.namelist(): + return [] + + shared_strings = [] + with zf.open("xl/sharedStrings.xml") as f: + context = ET.iterparse(f, events=("end",)) + for event, elem in context: + if elem.tag.endswith('}si'): # shared item + texts = [t.text for t in elem.findall('.//main:t', self.ns) if t.text] + shared_strings.append("".join(texts)) + elem.clear() + return shared_strings + + def _get_sheet_mapping(self, zf: zipfile.ZipFile) -> Dict[str, str]: + """ + 获取 Sheet 名称到文件路径的映射。 + 例如: {'Sheet1': 'xl/worksheets/sheet1.xml'} + """ + sheet_name_to_rid = {} try: - with zipfile.ZipFile(BytesIO(document.content), 'r') as original_zip: - # --- 1. 处理共享字符串 (sharedStrings.xml) --- - # 这是所有文本(包括富文本片段)的主要存储位置。 - if "xl/sharedStrings.xml" in original_zip.namelist(): - with original_zip.open("xl/sharedStrings.xml") as f: - root = ET.fromstring(f.read()) - # 查找所有 元素,无论它们在哪个层级,这能正确捕获富文本片段 - text_nodes = root.findall('.//main:t', ns) - for node in text_nodes: - # 确保节点有文本内容且不是纯粹的空白 - if node.text and node.text.strip(): - texts_to_translate.add(node.text) + with zf.open("xl/workbook.xml") as f: + root = ET.fromstring(f.read()) + for sheet in root.findall(".//main:sheet", self.ns): + name = sheet.get("name") + rid = sheet.get(f"{{{self.ns['r']}}}id") + if name and rid: + sheet_name_to_rid[name] = rid + except Exception: + return {} - # --- 2. 处理表格标题 (tableX.xml) --- - # 表格的列名不存储在 sharedStrings.xml 中,需要单独处理。 - for item in original_zip.infolist(): - if item.filename.startswith("xl/tables/table"): - with original_zip.open(item.filename) as f: - root = ET.fromstring(f.read()) - table_columns = root.findall('.//main:tableColumn', ns) - for col in table_columns: - original_name = col.get('name') - if original_name and original_name.strip(): - texts_to_translate.add(original_name) + rid_to_target = {} + try: + with zf.open("xl/_rels/workbook.xml.rels") as f: + tree = ET.parse(f) + root = tree.getroot() + for child in root: + rid = child.get("Id") + target = child.get("Target") + if rid and target: + if not target.startswith("/"): + target = "xl/" + target + rid_to_target[rid] = target + except Exception: + return {} - except Exception as e: - self.logger.error(f"直接解析XLSX的XML文件失败: {e}") + mapping = {} + for name, rid in sheet_name_to_rid.items(): + if rid in rid_to_target: + mapping[name] = rid_to_target[rid] + + return mapping + + def _normalize_range(self, range_str: str) -> str: + """ + 将用户输入的简写转换为标准范围格式。 + "C" -> "C:C" + "3" -> "3:3" + "A1" -> "A1:A1" (openpyxl range_boundaries 实际上支持 A1,但这里统一处理更安全) + """ + # 纯字母 (例如 "C", "AA") -> 整列 + if re.match(r"^[A-Za-z]+$", range_str): + return f"{range_str}:{range_str}" + # 纯数字 (例如 "3", "10") -> 整行 + if re.match(r"^\d+$", range_str): + return f"{range_str}:{range_str}" + return range_str + + def _parse_region_boundaries(self, sheet_mapping: Dict[str, str]) -> Dict[str, List[Tuple]]: + """ + 解析配置的 translate_regions。 + 返回: { 'xl/worksheets/sheet1.xml': [(min_col, min_row, max_col, max_row), ...], ... } + """ + if not self.translate_regions: + return {} + + region_map = {} # filename -> list of boundaries + global_regions = [] # list of boundaries for all sheets + + for region in self.translate_regions: + sheet_name = None + raw_range = region.strip() + + if "!" in raw_range: + parts = raw_range.split("!", 1) + sheet_name = parts[0].strip("'") + range_part = self._normalize_range(parts[1]) + else: + range_part = self._normalize_range(raw_range) + + try: + # boundaries: (min_col, min_row, max_col, max_row) + boundaries = range_boundaries(range_part) + + if sheet_name: + filename = sheet_mapping.get(sheet_name) + if filename: + if filename not in region_map: + region_map[filename] = [] + region_map[filename].append(boundaries) + else: + global_regions.append(boundaries) + except Exception as e: + self.logger.warning(f"无法解析区域 '{region}': {e}") + + # 将全局区域添加到所有已知 Sheet + if global_regions: + all_files = set(sheet_mapping.values()) + for f in all_files: + if f not in region_map: + region_map[f] = [] + region_map[f].extend(global_regions) + + return region_map + + def _is_in_boundaries(self, col: int, row: int, boundaries_list: List[Tuple]) -> bool: + """检查坐标 (col, row) 是否在给定的边界列表中。""" + for (min_col, min_row, max_col, max_row) in boundaries_list: + if min_col is not None and col < min_col: continue + if min_row is not None and row < min_row: continue + if max_col is not None and col > max_col: continue + if max_row is not None and row > max_row: continue + return True + return False + + # ========================================================================= + # 高效 XML 区域提取与重构 + # ========================================================================= + + def _get_texts_xml_regions(self, document: Document) -> List[str]: + """使用纯 XML 解析(结合 SharedStrings)提取指定区域文本。""" + texts_to_translate = set() + + with zipfile.ZipFile(BytesIO(document.content), 'r') as zf: + shared_strings = self._get_shared_strings(zf) + if not shared_strings: + return [] + + sheet_mapping = self._get_sheet_mapping(zf) + boundaries_map = self._parse_region_boundaries(sheet_mapping) + + for filename, boundaries in boundaries_map.items(): + if filename not in zf.namelist(): + continue + + with zf.open(filename) as f: + context = ET.iterparse(f, events=("end",)) + for event, elem in context: + if elem.tag.endswith('}c'): # Cell + r_attr = elem.get('r') # e.g. "C5" + t_attr = elem.get('t') # e.g. "s" + + if r_attr and t_attr == 's': + try: + # 【修正】coordinate_to_tuple 返回 (row, col) + row, col = coordinate_to_tuple(r_attr) + if self._is_in_boundaries(col, row, boundaries): + v_node = elem.find('main:v', self.ns) + if v_node is not None and v_node.text: + idx = int(v_node.text) + if 0 <= idx < len(shared_strings): + texts_to_translate.add(shared_strings[idx]) + except Exception: + pass + elem.clear() return list(texts_to_translate) - def _rebuild_xlsx_with_translated_content(self, original_content_bytes: bytes, translation_map: dict) -> bytes: - """ - 【无需修改】通过替换 sharedStrings.xml 和 tableX.xml 中的文本内容来重构 XLSX 文件。 - 此函数的逻辑与新的读取逻辑完全匹配。 - """ - # 注册命名空间以正确解析和生成XML - ns = { - 'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main' - } - ET.register_namespace('', ns['main']) - - original_zip_io = BytesIO(original_content_bytes) + def _rebuild_xml_regions(self, original_content_bytes: bytes, translation_map: dict) -> bytes: + """使用纯 XML 重构,修正了坐标解包顺序。""" output_zip_io = BytesIO() + with zipfile.ZipFile(BytesIO(original_content_bytes), 'r') as zf_in: + with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as zf_out: + + shared_strings = self._get_shared_strings(zf_in) + sheet_mapping = self._get_sheet_mapping(zf_in) + boundaries_map = self._parse_region_boundaries(sheet_mapping) + + for item in zf_in.infolist(): + if item.filename in boundaries_map: + boundaries = boundaries_map[item.filename] + + with zf_in.open(item.filename) as f: + tree = ET.parse(f) + root = tree.getroot() + + cells_modified = False + for cell in root.findall(".//main:c", self.ns): + r_attr = cell.get('r') + t_attr = cell.get('t') + + if r_attr and t_attr == 's': + try: + # 【修正】coordinate_to_tuple 返回 (row, col) + row, col = coordinate_to_tuple(r_attr) + + if self._is_in_boundaries(col, row, boundaries): + v_node = cell.find('main:v', self.ns) + if v_node is not None and v_node.text: + idx = int(v_node.text) + if 0 <= idx < len(shared_strings): + original_text = shared_strings[idx] + + if original_text in translation_map: + translated_text = translation_map[original_text] + + final_text = translated_text + if self.insert_mode == "append": + final_text = original_text + self.separator + translated_text + elif self.insert_mode == "prepend": + final_text = translated_text + self.separator + original_text + + # 转换为 inlineStr + cell.set('t', 'inlineStr') + cell.remove(v_node) + is_node = ET.Element(f"{{{self.ns['main']}}}is") + t_node = ET.SubElement(is_node, f"{{{self.ns['main']}}}t") + t_node.text = final_text + cell.append(is_node) + + cells_modified = True + except Exception: + pass + + if cells_modified: + xml_str = ET.tostring(root, encoding='utf-8', xml_declaration=True) + zf_out.writestr(item, xml_str) + else: + zf_out.writestr(item, zf_in.read(item.filename)) + else: + zf_out.writestr(item, zf_in.read(item.filename)) + + return output_zip_io.getvalue() + + # ========================================================================= + # 原有全文档逻辑 (针对全文档翻译保持极致速度) + # ========================================================================= + + def _get_texts_xml_all(self, document: Document) -> List[str]: + texts_to_translate = set() try: - with zipfile.ZipFile(original_zip_io, 'r') as original_zip: - with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as output_zip: - for item in original_zip.infolist(): - file_content = original_zip.read(item.filename) + with zipfile.ZipFile(BytesIO(document.content), 'r') as zf: + if "xl/sharedStrings.xml" in zf.namelist(): + with zf.open("xl/sharedStrings.xml") as f: + root = ET.fromstring(f.read()) + for node in root.findall('.//main:t', self.ns): + if node.text and node.text.strip(): + texts_to_translate.add(node.text) + + for item in zf.infolist(): + if item.filename.startswith("xl/tables/table"): + with zf.open(item.filename) as f: + root = ET.fromstring(f.read()) + for col in root.findall('.//main:tableColumn', self.ns): + if col.get('name'): + texts_to_translate.add(col.get('name')) + except Exception as e: + self.logger.error(f"XML解析失败: {e}") + return list(texts_to_translate) + + def _rebuild_xml_all(self, original_content_bytes: bytes, translation_map: dict) -> bytes: + output_zip_io = BytesIO() + try: + with zipfile.ZipFile(BytesIO(original_content_bytes), 'r') as zf_in: + with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as zf_out: + for item in zf_in.infolist(): + content = zf_in.read(item.filename) - # --- 1. 处理共享字符串文件 --- if item.filename == "xl/sharedStrings.xml": - root = ET.fromstring(file_content) - text_nodes = root.findall('.//main:t', ns) - for node in text_nodes: - original_text = node.text - if original_text in translation_map: - translated_text = translation_map[original_text] - if self.insert_mode == "replace": - node.text = translated_text - elif self.insert_mode == "append": - node.text = original_text + self.separator + translated_text + root = ET.fromstring(content) + for node in root.findall('.//main:t', self.ns): + if node.text in translation_map: + trans = translation_map[node.text] + if self.insert_mode == "append": + node.text = node.text + self.separator + trans elif self.insert_mode == "prepend": - node.text = translated_text + self.separator + original_text - file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True) + node.text = trans + self.separator + node.text + else: + node.text = trans + content = ET.tostring(root, encoding='utf-8', xml_declaration=True) - # --- 2. 处理表格定义文件 --- elif item.filename.startswith("xl/tables/table"): - root = ET.fromstring(file_content) - table_columns = root.findall('.//main:tableColumn', ns) - for col in table_columns: - original_name = col.get('name') - if original_name in translation_map: - translated_name = translation_map[original_name] - if self.insert_mode == "replace": - col.set('name', translated_name) - elif self.insert_mode == "append": - col.set('name', original_name + self.separator + translated_name) + root = ET.fromstring(content) + for col in root.findall('.//main:tableColumn', self.ns): + orig = col.get('name') + if orig in translation_map: + trans = translation_map[orig] + if self.insert_mode == "append": + col.set('name', orig + self.separator + trans) elif self.insert_mode == "prepend": - col.set('name', translated_name + self.separator + original_name) - file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True) - - output_zip.writestr(item, file_content) + col.set('name', trans + self.separator + orig) + else: + col.set('name', trans) + content = ET.tostring(root, encoding='utf-8', xml_declaration=True) + zf_out.writestr(item, content) return output_zip_io.getvalue() - - except (zipfile.BadZipFile, ET.ParseError) as e: - self.logger.error(f"处理XLSX文件失败: {e}. 返回原始文件。") + except Exception as e: + self.logger.error(f"XML重构失败: {e}") return original_content_bytes + # ========================================================================= + # 主入口 + # ========================================================================= + def translate(self, document: Document) -> Self: - original_texts = self._get_texts_to_translate(document) + if self.translate_regions: + original_texts = self._get_texts_xml_regions(document) + else: + original_texts = self._get_texts_xml_all(document) if not original_texts: - print("\n在文件中没有找到需要翻译的文本内容。") + print(f"\n未找到需要翻译的文本 (模式: {'区域' if self.translate_regions else '全文档'}).") return self if self.glossary_agent: @@ -174,15 +394,21 @@ class XlsxTranslator(AiTranslator): translation_map = dict(zip(original_texts, translated_texts)) - document.content = self._rebuild_xlsx_with_translated_content(document.content, translation_map) + if self.translate_regions: + document.content = self._rebuild_xml_regions(document.content, translation_map) + else: + document.content = self._rebuild_xml_all(document.content, translation_map) return self async def translate_async(self, document: Document) -> Self: - original_texts = await asyncio.to_thread(self._get_texts_to_translate, document) + if self.translate_regions: + original_texts = await asyncio.to_thread(self._get_texts_xml_regions, document) + else: + original_texts = await asyncio.to_thread(self._get_texts_xml_all, document) if not original_texts: - print("\n在文件中没有找到需要翻译的文本内容。") + print(f"\n未找到需要翻译的文本 (模式: {'区域' if self.translate_regions else '全文档'}).") return self if self.glossary_agent: @@ -197,7 +423,9 @@ class XlsxTranslator(AiTranslator): translation_map = dict(zip(original_texts, translated_texts)) - document.content = await asyncio.to_thread(self._rebuild_xlsx_with_translated_content, document.content, - translation_map) + if self.translate_regions: + document.content = await asyncio.to_thread(self._rebuild_xml_regions, document.content, translation_map) + else: + document.content = await asyncio.to_thread(self._rebuild_xml_all, document.content, translation_map) return self \ No newline at end of file