修复xlsx工作流不能保留图片的问题

This commit is contained in:
xunbu
2025-10-30 19:03:10 +08:00
parent 27c3202534
commit d292f2bbd2

View File

@@ -4,6 +4,8 @@ import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from io import BytesIO from io import BytesIO
from typing import Self, Literal, List, Optional from typing import Self, Literal, List, Optional
import zipfile
import xml.etree.ElementTree as ET
import openpyxl import openpyxl
from openpyxl.cell import Cell from openpyxl.cell import Cell
@@ -48,139 +50,171 @@ class XlsxTranslator(AiTranslator):
self.translate_agent = SegmentsTranslateAgent(agent_config) self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode self.insert_mode = config.insert_mode
self.separator = config.separator self.separator = config.separator
# --- 新增功能 ---
self.translate_regions = config.translate_regions self.translate_regions = config.translate_regions
def _pre_translate(self, document: Document): def _get_texts_to_translate(self, document: Document) -> List[str]:
workbook = openpyxl.load_workbook(BytesIO(document.content)) """使用 openpyxl 识别指定区域内需要翻译的文本。"""
cells_to_translate = [] texts_to_translate = set()
# --- 步骤 1: 根据是否指定区域,收集需要翻译的文本单元格 ---
# 如果未指定翻译区域,则沿用旧逻辑,翻译所有单元格
if not self.translate_regions: # 也处理 None 或空列表的情况
for sheet in workbook.worksheets:
for row in sheet.iter_rows():
for cell in row:
if isinstance(cell.value, str) and cell.data_type == "s":
cells_to_translate.append({
"sheet_name": sheet.title,
"coordinate": cell.coordinate,
"original_text": cell.value,
})
# 如果指定了翻译区域,则只在这些区域内查找
else:
processed_coordinates = set()
regions_by_sheet = {}
all_sheet_regions = []
for region in self.translate_regions:
if '!' in region:
sheet_name, cell_range = region.split('!', 1)
if sheet_name not in regions_by_sheet:
regions_by_sheet[sheet_name] = []
regions_by_sheet[sheet_name].append(cell_range)
else:
all_sheet_regions.append(region)
for sheet in workbook.worksheets:
sheet_specific_ranges = regions_by_sheet.get(sheet.title, [])
total_ranges_for_this_sheet = sheet_specific_ranges + all_sheet_regions
if not total_ranges_for_this_sheet:
continue
for cell_range in total_ranges_for_this_sheet:
try:
cells_in_range = sheet[cell_range]
# --- START: 这是修改的关键部分 ---
# 无论返回的是单个cell、一维元组(行/列)还是二维元组(矩形),都将其展平为一维列表
flat_cells = []
if isinstance(cells_in_range, Cell):
flat_cells.append(cells_in_range)
elif isinstance(cells_in_range, tuple):
for item in cells_in_range:
if isinstance(item, Cell):
flat_cells.append(item) # 处理一维元组
elif isinstance(item, tuple):
for cell in item: # 处理二维元组
flat_cells.append(cell)
# --- END: 修改结束 ---
# 使用简化后的单层循环
for cell in flat_cells:
full_coordinate = (sheet.title, cell.coordinate)
if full_coordinate in processed_coordinates:
continue
if isinstance(cell.value, str) and cell.data_type == "s":
cell_info = {
"sheet_name": sheet.title,
"coordinate": cell.coordinate,
"original_text": cell.value,
}
cells_to_translate.append(cell_info)
processed_coordinates.add(full_coordinate)
except Exception as e:
self.logger.warning(f"跳过无效的区域 '{cell_range}' 在工作表 '{sheet.title}'. 错误: {e}")
original_texts = [cell["original_text"] for cell in cells_to_translate]
return workbook, cells_to_translate, original_texts
def _after_translate(self, workbook, cells_to_translate, translated_texts, original_texts):
for i, cell_info in enumerate(cells_to_translate):
sheet_name = cell_info["sheet_name"]
coordinate = cell_info["coordinate"]
translated_text = translated_texts[i]
original_text = original_texts[i]
# 定位到工作表和单元格
sheet = workbook[sheet_name]
if self.insert_mode == "replace":
sheet[coordinate] = translated_text
elif self.insert_mode == "append":
sheet[coordinate] = original_text + self.separator + translated_text
elif self.insert_mode == "prepend":
sheet[coordinate] = translated_text + self.separator + original_text
else:
self.logger.error("不正确的XlsxTranslatorConfig参数")
workbook_output_stream = BytesIO()
# 保存修改后的工作簿到新文件
try: try:
workbook.save(workbook_output_stream) # 使用 data_only=True 来获取单元格的计算值,而不是公式
finally: workbook = openpyxl.load_workbook(BytesIO(document.content), data_only=True)
# 如果未指定区域,则翻译所有文本
if not self.translate_regions:
for sheet in workbook.worksheets:
for row in sheet.iter_rows():
for cell in row:
# 仅处理共享字符串类型,这是最常见的文本存储方式
if isinstance(cell.value, str) and cell.data_type == "s":
texts_to_translate.add(cell.value)
# 同时也要检查表格的标题
for sheet in workbook.worksheets:
for table in sheet._tables:
for column in table.tableColumns:
if column.name:
texts_to_translate.add(column.name)
# 如果指定了区域
else:
processed_coordinates = set()
regions_by_sheet = {}
all_sheet_regions = []
for region in self.translate_regions:
if '!' in region:
sheet_name, cell_range = region.split('!', 1)
# 支持带引号的工作表名称
sheet_name = sheet_name.strip("'")
if sheet_name not in regions_by_sheet:
regions_by_sheet[sheet_name] = []
regions_by_sheet[sheet_name].append(cell_range)
else:
all_sheet_regions.append(region)
for sheet in workbook.worksheets:
sheet_specific_ranges = regions_by_sheet.get(sheet.title, [])
total_ranges_for_this_sheet = sheet_specific_ranges + all_sheet_regions
if not total_ranges_for_this_sheet:
continue
# 检查此区域内的表格标题
for table in sheet._tables:
# openpyxl 没有提供简单的方法来检查表格是否与区域相交
# 为简单起见,我们假设如果指定了工作表,则翻译该工作表上的所有表格标题
for column in table.tableColumns:
if column.name:
texts_to_translate.add(column.name)
for cell_range in total_ranges_for_this_sheet:
try:
cells_in_range = sheet[cell_range]
flat_cells = []
if isinstance(cells_in_range, Cell):
flat_cells.append(cells_in_range)
elif isinstance(cells_in_range, tuple):
for item in cells_in_range:
if isinstance(item, Cell):
flat_cells.append(item)
elif isinstance(item, tuple):
flat_cells.extend(item)
for cell in flat_cells:
if isinstance(cell.value, str) and cell.data_type == "s":
texts_to_translate.add(cell.value)
except Exception as e:
self.logger.warning(f"跳过无效的区域 '{cell_range}' 在工作表 '{sheet.title}'. 错误: {e}")
workbook.close() workbook.close()
return workbook_output_stream.getvalue() except Exception as e:
self.logger.error(f"使用 openpyxl 预处理文件失败: {e}")
return list(texts_to_translate)
def _rebuild_xlsx_with_translated_content(self, original_content_bytes: bytes, translation_map: dict) -> bytes:
"""
通过替换 sharedStrings.xml 和 tableX.xml 中的文本内容来重构 XLSX 文件。
"""
# 注册命名空间以正确解析和生成XML
ns = {
'main': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'
}
ET.register_namespace('', ns['main'])
original_zip_io = BytesIO(original_content_bytes)
output_zip_io = BytesIO()
try:
with zipfile.ZipFile(original_zip_io, 'r') as original_zip:
with zipfile.ZipFile(output_zip_io, 'w', zipfile.ZIP_DEFLATED) as output_zip:
for item in original_zip.infolist():
file_content = original_zip.read(item.filename)
# --- 1. 处理共享字符串文件 ---
if item.filename == "xl/sharedStrings.xml":
root = ET.fromstring(file_content)
text_nodes = root.findall('.//main:t', ns)
for node in text_nodes:
original_text = node.text
if original_text in translation_map:
translated_text = translation_map[original_text]
if self.insert_mode == "replace":
node.text = translated_text
elif self.insert_mode == "append":
node.text = original_text + self.separator + translated_text
elif self.insert_mode == "prepend":
node.text = translated_text + self.separator + original_text
file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True)
# --- 2. 处理表格定义文件 ---
elif item.filename.startswith("xl/tables/table"):
root = ET.fromstring(file_content)
table_columns = root.findall('.//main:tableColumn', ns)
for col in table_columns:
original_name = col.get('name')
if original_name in translation_map:
translated_name = translation_map[original_name]
if self.insert_mode == "replace":
col.set('name', translated_name)
elif self.insert_mode == "append":
col.set('name', original_name + self.separator + translated_name)
elif self.insert_mode == "prepend":
col.set('name', translated_name + self.separator + original_name)
file_content = ET.tostring(root, encoding='utf-8', xml_declaration=True)
output_zip.writestr(item, file_content)
return output_zip_io.getvalue()
except (zipfile.BadZipFile, ET.ParseError) as e:
self.logger.error(f"处理XLSX文件失败: {e}. 返回原始文件。")
return original_content_bytes
def translate(self, document: Document) -> Self: def translate(self, document: Document) -> Self:
original_texts = self._get_texts_to_translate(document)
workbook, cells_to_translate, original_texts = self._pre_translate(document) if not original_texts:
if not cells_to_translate:
print("\n在指定区域中没有找到需要翻译的纯文本内容。") print("\n在指定区域中没有找到需要翻译的纯文本内容。")
workbook.close()
return self return self
if self.glossary_agent: if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(original_texts, self.chunk_size) self.glossary_dict_gen = self.glossary_agent.send_segments(original_texts, self.chunk_size)
if self.translate_agent: if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen) self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
# --- 步骤 2: 调用翻译函数 ---
if self.translate_agent: if self.translate_agent:
translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size) translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size)
else: else:
translated_texts = original_texts translated_texts = original_texts
document.content = self._after_translate(workbook, cells_to_translate, translated_texts, original_texts) translation_map = dict(zip(original_texts, translated_texts))
document.content = self._rebuild_xlsx_with_translated_content(document.content, translation_map)
return self return self
async def translate_async(self, document: Document) -> Self: async def translate_async(self, document: Document) -> Self:
original_texts = await asyncio.to_thread(self._get_texts_to_translate, document)
workbook, cells_to_translate, original_texts = await asyncio.to_thread(self._pre_translate, document) if not original_texts:
if not cells_to_translate:
print("\n在指定区域中没有找到需要翻译的纯文本内容。") print("\n在指定区域中没有找到需要翻译的纯文本内容。")
workbook.close()
return self return self
if self.glossary_agent: if self.glossary_agent:
@@ -188,11 +222,14 @@ class XlsxTranslator(AiTranslator):
if self.translate_agent: if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen) self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
# --- 步骤 2: 调用翻译函数 ---
if self.translate_agent: if self.translate_agent:
translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size) translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size)
else: else:
translated_texts = original_texts translated_texts = original_texts
document.content = await asyncio.to_thread(self._after_translate, workbook, cells_to_translate,
translated_texts, original_texts) translation_map = dict(zip(original_texts, translated_texts))
document.content = await asyncio.to_thread(self._rebuild_xlsx_with_translated_content, document.content,
translation_map)
return self return self