添加对.xlsx的翻译支持

This commit is contained in:
xunbu
2025-08-04 18:20:22 +08:00
parent 372c4d11b5
commit 141de321db
10 changed files with 278 additions and 42 deletions

View File

@@ -1,16 +1,18 @@
import json
from dataclasses import dataclass
from docutranslate.agents import AgentConfig, Agent
from docutranslate.utils.json_utils import flat_json_split
@dataclass
class JsonTranslateAgentConfig(AgentConfig):
class SegmentsTranslateAgentConfig(AgentConfig):
to_lang: str
custom_prompt: str | None = None
class JsonTranslateAgent(Agent):
def __init__(self, config: JsonTranslateAgentConfig):
class SegmentsTranslateAgent(Agent):
def __init__(self, config: SegmentsTranslateAgentConfig):
super().__init__(config)
self.system_prompt = f"""
# 角色
@@ -34,3 +36,26 @@ class JsonTranslateAgent(Agent):
"""
if config.custom_prompt:
self.system_prompt += "\n# 重要规则或背景【非常重要】\n" + config.custom_prompt + '\n'
def send_segments(self, segments: list[str], chunk_size: int):
indexed_originals = {str(i): text for i, text in enumerate(segments)}
chunks = flat_json_split(indexed_originals, chunk_size)
prompts = [json.dumps(chunk) for chunk in chunks]
translated_chunks = super().send_prompts(prompts=prompts)
indexed_translated = indexed_originals.copy()
for chunk_str in translated_chunks:
translated_part = json.loads(chunk_str)
indexed_translated.update(translated_part)
return list(indexed_translated.values())
#todo:增加协程粒度
async def send_segments_async(self, segments: list[str], chunk_size: int):
indexed_originals = {str(i): text for i, text in enumerate(segments)}
chunks = flat_json_split(indexed_originals, chunk_size)
prompts = [json.dumps(chunk) for chunk in chunks]
translated_chunks = await super().send_prompts_async(prompts=prompts)
indexed_translated = indexed_originals.copy()
for chunk_str in translated_chunks:
translated_part = json.loads(chunk_str)
indexed_translated.update(translated_part)
return list(indexed_translated.values())

View File

View File

@@ -0,0 +1,8 @@
from docutranslate.exporter.base import Exporter
from docutranslate.ir.document import Document
#TODO:看情况是否需要为json单独写一个document类型
class XlsxExporter(Exporter[Document]):
def export(self,document:Document)->Document:
...

View File

@@ -0,0 +1,62 @@
from dataclasses import dataclass
from io import BytesIO
import jinja2
import openpyxl
from docutranslate.exporter.base import ExporterConfig
from docutranslate.exporter.xlsx.base import XlsxExporter
from docutranslate.ir.document import Document
from docutranslate.utils.resource_utils import resource_path
@dataclass
class Xlsx2HTMLExporterConfig(ExporterConfig):
cdn: bool = True
class Xlsx2HTMLExporter(XlsxExporter):
def __init__(self, config: Xlsx2HTMLExporterConfig = None):
config = config or Xlsx2HTMLExporterConfig()
super().__init__(config=config)
self.cdn = config.cdn
def export(self, document: Document) -> Document:
# 1. 加载工作簿和工作表
workbook = openpyxl.load_workbook(BytesIO(document.content))
sheet = workbook.active
# 2. 手动构建HTML字符串
table = '<table border="1">\n'
# 处理表头
table += ' <thead>\n <tr>\n'
for cell in sheet[1]: # 假设第一行是表头
table += f' <th>{cell.value}</th>\n'
table += ' </tr>\n </thead>\n'
# 处理数据行
table += ' <tbody>\n'
# iter_rows(min_row=2) 从第二行开始遍历
for row in sheet.iter_rows(min_row=2):
table += ' <tr>\n'
for cell in row:
# 处理None值防止在HTML中显示"None"
cell_value = cell.value if cell.value is not None else ""
table += f' <td>{cell_value}</td>\n'
table += ' </tr>\n'
table += ' </tbody>\n'
table += '</table>'
html_template = resource_path("template/xlsx.html").read_text(encoding="utf-8")
pico = f'<style>{resource_path("static/pico.css").read_text(encoding="utf-8")}</style>' if not cdn else r'<link rel="stylesheet" href="https://s4.zstatic.net/ajax/libs/picocss/2.1.1/pico.min.css" integrity="sha512-+4kjFgVD0n6H3xt19Ox84B56MoS7srFn60tgdWFuO4hemtjhySKyW4LnftYZn46k3THUEiTTsbVjrHai+0MOFw==" crossorigin="anonymous" referrerpolicy="no-referrer" />'
render = jinja2.Template(html_template).render(
title=document.stem,
pico=pico,
body=table,
)
print("\n通过openpyxl手动生成了 output_manual.html 文件!")
return Document.from_bytes(content=render.encode("utf-8"), suffix=".html", stem=document.stem)

View File

@@ -0,0 +1,8 @@
from docutranslate.exporter.txt.base import TXTExporter
from docutranslate.exporter.xlsx.base import XlsxExporter
from docutranslate.ir.document import Document
class Xlsx2XlsxExporter(XlsxExporter):
def export(self, document: Document) -> Document:
return document.copy()

View File

@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{ title }}</title>
{{pico}}
<style>
html {
padding: 2vh 10vw;
font-size: 15px;
}
</style>
</head>
<body>
{{ body }}
</body>
</html>

View File

@@ -4,7 +4,7 @@ from typing import Self, Any
from jsonpath_ng.ext import parse
from docutranslate.agents.segments_agent import JsonTranslateAgentConfig, JsonTranslateAgent
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig
from docutranslate.translator.base import Translator
@@ -20,18 +20,18 @@ class JsonTranslator(Translator):
def __init__(self, config: JsonTranslatorConfig):
super().__init__(config=config)
self.chunk_size = config.chunk_size
agent_config = JsonTranslateAgentConfig(custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
model_id=config.model_id,
system_prompt=None,
temperature=config.temperature,
thinking=config.thinking,
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger)
self.translate_agent = JsonTranslateAgent(agent_config)
agent_config = SegmentsTranslateAgentConfig(custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
model_id=config.model_id,
system_prompt=None,
temperature=config.temperature,
thinking=config.thinking,
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.jsonpaths = config.json_paths
def _extract_matches(self, content: dict) -> list[Any]:
@@ -71,30 +71,6 @@ class JsonTranslator(Translator):
# 5. 按原始顺序返回翻译后的文本列表
return list(indexed_translated.values())
async def _translate_texts_in_batches_async(self, texts: list[str]) -> list[str]:
"""
将文本列表打包、分块、发送翻译并返回翻译结果。
此函数封装了与翻译代理交互的所有细节。
"""
# 1. 使用索引作为唯一ID将文本列表转换为字典便于API处理
indexed_originals = {str(i): text for i, text in enumerate(texts)}
# 2. 将大字典分割成小块以满足API的限制
chunks = flat_json_split(indexed_originals, self.chunk_size)
# 3. 将每个块序列化为JSON字符串并发送翻译
prompts = [json.dumps(chunk) for chunk in chunks]
translated_chunks = await self.translate_agent.send_prompts_async(prompts)
# 4. 将翻译结果合并回一个字典
# 我们从原始字典的副本开始,以确保即使翻译失败,我们也能保持结构
indexed_translated = indexed_originals.copy()
for chunk_str in translated_chunks:
translated_part = json.loads(chunk_str)
indexed_translated.update(translated_part)
# 5. 按原始顺序返回翻译后的文本列表
return list(indexed_translated.values())
def _update_content_with_translations(self, content: dict, matches: list[Any], translated_texts: list[str]):
"""
使用翻译后的文本更新原始JSON内容。
@@ -127,7 +103,7 @@ class JsonTranslator(Translator):
original_texts = [match.value for match in all_matches]
# 步骤 2: 批量翻译提取出的文本
translated_texts = self._translate_texts_in_batches(original_texts)
translated_texts = self.translate_agent.send_segments(original_texts,self.chunk_size)
# 健壮性检查:确保翻译回来的项目数量与发送的一致
if len(original_texts) != len(translated_texts):
@@ -141,6 +117,7 @@ class JsonTranslator(Translator):
return self
# todo:增加协程粒度
async def translate_async(self, document: Document) -> Self:
content = json.loads(document.content.decode())
@@ -154,7 +131,7 @@ class JsonTranslator(Translator):
original_texts = [match.value for match in all_matches]
# 步骤 2: 批量翻译提取出的文本
translated_texts = await self._translate_texts_in_batches_async(original_texts)
translated_texts = await self.translate_agent.send_segments_async(original_texts,self.chunk_size)
# 健壮性检查:确保翻译回来的项目数量与发送的一致
if len(original_texts) != len(translated_texts):

View File

@@ -0,0 +1,136 @@
from dataclasses import dataclass
from io import BytesIO
from typing import Self, Literal
import openpyxl
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig
from docutranslate.translator.base import Translator
@dataclass
class XlsxTranslatorConfig(AiTranslatorConfig):
position: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n"
class XlsxTranslator(Translator):
def __init__(self, config: XlsxTranslatorConfig):
super().__init__(config=config)
self.chunk_size = config.chunk_size
agent_config = SegmentsTranslateAgentConfig(custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
model_id=config.model_id,
system_prompt=None,
temperature=config.temperature,
thinking=config.thinking,
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.position = config.position
self.separator = config.separator
def _pre_translate(self, document: Document):
workbook = openpyxl.load_workbook(BytesIO(document.content))
# --- 步骤 1: 收集所有需要翻译的文本单元格 ---
cells_to_translate = []
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
for row in sheet.iter_rows():
for cell in row:
# 关键判断:值是字符串(str) 且 数据类型是 's' (string),以排除公式('f')
if isinstance(cell.value, str) and cell.data_type == "s":
cell_info = {
"sheet_name": sheet_name,
"coordinate": cell.coordinate,
"original_text": cell.value,
}
cells_to_translate.append(cell_info)
# 提取所有原文文本,准备进行批量翻译
original_texts = [cell["original_text"] for cell in cells_to_translate]
return workbook, cells_to_translate, original_texts
def _after_translate(self, workbook, cells_to_translate, translated_texts, original_texts):
for i, cell_info in enumerate(cells_to_translate):
sheet_name = cell_info["sheet_name"]
coordinate = cell_info["coordinate"]
translated_text = translated_texts[i]
original_text = original_texts[i]
# 定位到工作表和单元格
sheet = workbook[sheet_name]
if self.position == "replace":
sheet[coordinate] = translated_text
elif self.position == "append":
sheet[coordinate] = original_text + self.separator + translated_text
elif self.position == "prepend":
sheet[coordinate] = translated_text + self.separator + original_text
else:
self.logger.error("不正确的XlsxTranslatorConfig参数")
workbook_output_stream = BytesIO()
# 保存修改后的工作簿到新文件
try:
workbook.save(workbook_output_stream)
finally:
workbook.close()
return workbook_output_stream.getvalue()
def translate(self, document: Document) -> Self:
workbook, cells_to_translate, original_texts = self._pre_translate(document)
if not cells_to_translate:
print("\n文件中没有找到需要翻译的纯文本内容。")
workbook.close()
return
# --- 步骤 2: 调用翻译函数 ---
translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size)
document.content = self._after_translate(workbook, cells_to_translate, translated_texts, original_texts)
return self
async def translate_async(self, document: Document) -> Self:
workbook, cells_to_translate, original_texts = await asyncio.to_thread(self._pre_translate, document)
if not cells_to_translate:
print("\n文件中没有找到需要翻译的纯文本内容。")
workbook.close()
return
# --- 步骤 2: 调用翻译函数 ---
translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size)
document.content = await asyncio.to_thread(self._after_translate, workbook, cells_to_translate,
translated_texts, original_texts)
return self
if __name__ == '__main__':
from pathlib import Path
import asyncio
config = XlsxTranslatorConfig(
base_url=r"https://open.bigmodel.cn/api/paas/v4/",
api_key=r"969ba51b61914cc2b710d1393dca1a3c.hSuATex5IoNVZNGu",
model_id=r"glm-4-flash",
to_lang="英文",
position="append"
)
translator = XlsxTranslator(config)
document = Document.from_path(r"C:\Users\jxgm\Desktop\translate\docutranslate\tests\files\工业互联分组表.xlsx")
async def run():
await translator.translate_async(document)
path = Path(r"C:\Users\jxgm\Desktop\translate\docutranslate\tests\output\output.xlsx")
path.write_bytes(document.content)
print(f"已保存到{path.resolve()}")
asyncio.run(run())

View File

@@ -8,6 +8,7 @@ dependencies = [
"markdown2>=2.5.3",
"fastapi[standard]>=0.115.12",
"jsonpath-ng>=1.7.0",
"openpyxl>=3.1.5",
]
dynamic = ["version"]

2
uv.lock generated
View File

@@ -285,6 +285,7 @@ dependencies = [
{ name = "httpx" },
{ name = "jsonpath-ng" },
{ name = "markdown2" },
{ name = "openpyxl" },
]
[package.optional-dependencies]
@@ -308,6 +309,7 @@ requires-dist = [
{ name = "jsonpath-ng", specifier = ">=1.7.0" },
{ name = "markdown2", specifier = ">=2.5.3" },
{ name = "opencv-python", marker = "extra == 'docling'", specifier = ">=4.11.0.86" },
{ name = "openpyxl", specifier = ">=3.1.5" },
]
provides-extras = ["docling"]