支持epub翻译

This commit is contained in:
xunbu
2025-08-09 10:41:56 +08:00
parent 4bc567e1db
commit feac68596b
11 changed files with 732 additions and 8 deletions

View File

@@ -28,7 +28,8 @@ from docutranslate.exporter.md.types import ConvertEngineType
from docutranslate.global_values.conditional_import import DOCLING_EXIST
from docutranslate.workflow.base import Workflow
from docutranslate.workflow.docx_workflow import DocxWorkflow, DocxWorkflowConfig
from docutranslate.workflow.interfaces import DocxExportable
from docutranslate.workflow.epub_workflow import EpubWorkflow, EpubWorkflowConfig
from docutranslate.workflow.interfaces import DocxExportable, EpubExportable
from docutranslate.workflow.interfaces import HTMLExportable, MDFormatsExportable, TXTExportable, JsonExportable, \
XlsxExportable, SrtExportable
from docutranslate.workflow.json_workflow import JsonWorkflow, JsonWorkflowConfig
@@ -52,6 +53,8 @@ from docutranslate.translator.ai_translator.docx_translator import DocxTranslato
from docutranslate.exporter.docx.docx2html_exporter import Docx2HTMLExporterConfig
from docutranslate.translator.ai_translator.srt_translator import SrtTranslatorConfig
from docutranslate.exporter.srt.srt2html_exporter import Srt2HTMLExporterConfig
from docutranslate.translator.ai_translator.epub_translator import EpubTranslatorConfig
from docutranslate.exporter.epub.epub2html_exporter import Epub2HTMLExporterConfig
# ------------------------------------
from docutranslate.logger import global_logger
@@ -73,6 +76,7 @@ WORKFLOW_DICT: Dict[str, Type[Workflow]] = {
"xlsx": XlsxWorkflow,
"docx": DocxWorkflow,
"srt": SrtWorkflow,
"epub": EpubWorkflow,
}
@@ -267,10 +271,22 @@ class SrtWorkflowParams(BaseWorkflowParams):
)
class EpubWorkflowParams(BaseWorkflowParams):
workflow_type: Literal['epub'] = Field(..., description="指定使用EPUB的翻译工作流。")
insert_mode: Literal["replace", "append", "prepend"] = Field(
"replace",
description="翻译文本的插入模式。'replace':替换原文,'append':附加到原文后,'prepend':附加到原文前。"
)
separator: str = Field(
"\n",
description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。"
)
# 3. 使用可辨识联合类型Discriminated Union将它们组合起来
TranslatePayload = Annotated[
Union[
MarkdownWorkflowParams, TextWorkflowParams, JsonWorkflowParams, XlsxWorkflowParams, DocxWorkflowParams, SrtWorkflowParams],
MarkdownWorkflowParams, TextWorkflowParams, JsonWorkflowParams, XlsxWorkflowParams, DocxWorkflowParams, SrtWorkflowParams, EpubWorkflowParams],
Field(discriminator='workflow_type')
]
@@ -278,7 +294,7 @@ TranslatePayload = Annotated[
# 4. 创建最终的请求体模型
class TranslateServiceRequest(BaseModel):
file_name: str = Field(..., description="上传的原始文件名,含扩展名。",
examples=["my_paper.pdf", "chapter1.txt", "data.xlsx", "video.srt"])
examples=["my_paper.pdf", "chapter1.txt", "data.xlsx", "video.srt", "my_book.epub"])
file_content: str = Field(..., description="Base64编码的文件内容。", examples=["JVBERi0xLjQK..."])
payload: TranslatePayload = Field(..., description="包含工作流类型和相应参数的载荷。")
@@ -363,6 +379,21 @@ class TranslateServiceRequest(BaseModel):
"insert_mode": "replace",
}
}
},
{
"summary": "EPUB 工作流示例",
"value": {
"file_name": "my_book.epub",
"file_content": "UEsDBBQAAAAIA... (base64-encoded epub)",
"payload": {
"workflow_type": "epub",
"base_url": "https://api.openai.com/v1",
"api_key": "sk-your-api-key-here",
"model_id": "gpt-4o",
"to_lang": "简体中文",
"insert_mode": "replace",
}
}
}
]
}
@@ -504,6 +535,23 @@ async def _perform_translation(
)
workflow = SrtWorkflow(config=workflow_config)
elif isinstance(payload, EpubWorkflowParams):
task_logger.info("构建 EpubWorkflow 配置。")
translator_config = EpubTranslatorConfig(
**payload.model_dump(include={
'base_url', 'api_key', 'model_id', 'to_lang', 'custom_prompt',
'temperature', 'thinking', 'chunk_size', 'concurrent',
'insert_mode', 'separator'
}, exclude_none=True)
)
html_exporter_config = Epub2HTMLExporterConfig(cdn=True)
workflow_config = EpubWorkflowConfig(
translator_config=translator_config,
html_exporter_config=html_exporter_config,
logger=task_logger
)
workflow = EpubWorkflow(config=workflow_config)
else:
raise TypeError(f"工作流类型 '{payload.workflow_type}' 的处理逻辑未实现。")
@@ -631,7 +679,7 @@ def _cancel_translation_logic(task_id: str):
description="""
接收一个包含文件内容Base64编码和工作流参数的JSON请求启动一个后台翻译任务。
- **工作流选择**: 请求体中的 `payload.workflow_type` 字段决定了本次任务的类型(如 `markdown_based`, `txt`, `json`, `xlsx`, `docx`, `srt`)。
- **工作流选择**: 请求体中的 `payload.workflow_type` 字段决定了本次任务的类型(如 `markdown_based`, `txt`, `json`, `xlsx`, `docx`, `srt`, `epub`)。
- **动态参数**: 根据所选工作流API需要不同的参数集。请参考下面的Schema或示例。
- **异步处理**: 此端点会立即返回任务ID客户端需轮询状态接口获取进度。
""",
@@ -754,6 +802,20 @@ async def service_release_task(task_id: str):
}
}
},
"completed_epub": {
"summary": "已完成 (EPUB)",
"value": {
"task_id": "e9b8d7c6", "is_processing": False,
"status_message": "翻译成功!用时 45.32 秒。",
"error_flag": False, "download_ready": True, "original_filename_stem": "my_book",
"original_filename": "my_book.epub", "task_start_time": 1678890000.0,
"task_end_time": 1678890045.32,
"downloads": {
"epub": "/service/download/e9b8d7c6/epub",
"html": "/service/download/e9b8d7c6/html"
}
}
},
"error": {
"summary": "失败",
"value": {
@@ -795,6 +857,8 @@ async def service_get_status(
downloads["docx"] = f"/service/download/{task_id}/docx"
if isinstance(workflow, SrtExportable):
downloads["srt"] = f"/service/download/{task_id}/srt"
if isinstance(workflow, EpubExportable):
downloads["epub"] = f"/service/download/{task_id}/epub"
return JSONResponse(content={
"task_id": task_id,
@@ -829,7 +893,7 @@ async def service_get_logs(task_id: str):
return JSONResponse(content={"logs": new_logs})
FileType = Literal["markdown", "markdown_zip", "html", "txt", "json", "xlsx", "docx", "srt"]
FileType = Literal["markdown", "markdown_zip", "html", "txt", "json", "xlsx", "docx", "srt", "epub"]
async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple[bytes, str, str]:
@@ -870,6 +934,8 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
html_config = Docx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, SrtWorkflow):
html_config = Srt2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, EpubWorkflow):
html_config = Epub2HTMLExporterConfig(cdn=is_cdn_available)
if file_type == 'html' and isinstance(workflow, HTMLExportable):
content_str = await asyncio.to_thread(workflow.export_to_html, html_config)
@@ -900,6 +966,9 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
content_str = await asyncio.to_thread(workflow.export_to_srt)
content_bytes, media_type, filename = content_str.encode(
'utf-8'), "text/plain; charset=utf-8", f"{filename_stem}_translated.srt"
elif file_type == 'epub' and isinstance(workflow, EpubExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_epub)
media_type, filename = "application/epub+zip", f"{filename_stem}_translated.epub"
else:
raise HTTPException(status_code=404, detail=f"此任务不支持导出 '{file_type}' 类型的文件。")
@@ -925,6 +994,8 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
"schema": {"type": "string", "format": "binary"}},
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
"schema": {"type": "string", "format": "binary"}},
"application/epub+zip": {
"schema": {"type": "string", "format": "binary"}},
}
},
404: {"description": "任务ID不存在或该任务不支持所请求的文件类型。"},
@@ -933,7 +1004,8 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
)
async def service_download_file(
task_id: str = FastApiPath(..., description="已完成任务的ID", examples=["b2865b93"]),
file_type: FileType = FastApiPath(..., description="要下载的文件类型。", examples=["html", "json", "docx", "srt"])
file_type: FileType = FastApiPath(..., description="要下载的文件类型。",
examples=["html", "json", "docx", "srt", "epub"])
):
content, media_type, filename = await _get_content_from_workflow(task_id, file_type)
headers = {"Content-Disposition": f"attachment; filename*=UTF-8''{quote(filename, safe='', encoding='utf-8')}"}
@@ -968,6 +1040,14 @@ async def service_download_file(
"filename": "my_doc_translated.docx",
"content": "UEsDBBQAAAAIA... (base64-encoded string)"
}
},
"epub_base64": {
"summary": "EPUB 内容 (Base64)",
"value": {
"file_type": "epub",
"filename": "my_book_translated.epub",
"content": "UEsDBBQAAAAIA... (base64-encoded string)"
}
}
}}}
},
@@ -978,7 +1058,7 @@ async def service_download_file(
async def service_content(
task_id: str = FastApiPath(..., description="已完成任务的ID", examples=["b2865b93"]),
file_type: FileType = FastApiPath(..., description="要获取内容的文件类型。",
examples=["html", "json", "docx", "srt"])
examples=["html", "json", "docx", "srt", "epub"])
):
content, _, filename = await _get_content_from_workflow(task_id, file_type)

View File

View File

@@ -0,0 +1,8 @@
from docutranslate.exporter.base import Exporter
from docutranslate.ir.document import Document
#TODO:看情况是否需要为json单独写一个document类型
class EpubExporter(Exporter[Document]):
def export(self,document:Document)->Document:
...

View File

@@ -0,0 +1,8 @@
from docutranslate.exporter.txt.base import TXTExporter
from docutranslate.exporter.xlsx.base import XlsxExporter
from docutranslate.ir.document import Document
class Epub2EpubExporter(XlsxExporter):
def export(self, document: Document) -> Document:
return document.copy()

View File

@@ -0,0 +1,351 @@
import base64
import io
import os
import zipfile
from dataclasses import dataclass
from xml.etree import ElementTree
from pathlib import Path
import re
import mimetypes
from bs4 import BeautifulSoup
from docutranslate.exporter.base import ExporterConfig
from docutranslate.exporter.epub.base import EpubExporter
from docutranslate.ir.document import Document
@dataclass
class Epub2HTMLExporterConfig(ExporterConfig):
cdn: bool = True
class Epub2HTMLExporter(EpubExporter):
def __init__(self, config: Epub2HTMLExporterConfig = None):
config = config or Epub2HTMLExporterConfig()
super().__init__(config=config)
def _extract_opf_path(self, zip_file):
"""从 META-INF/container.xml 中提取 OPF 文件路径"""
try:
container_xml = zip_file.read('META-INF/container.xml')
container_root = ElementTree.fromstring(container_xml)
# 查找 rootfile 元素
rootfile = container_root.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile')
if rootfile is not None:
return rootfile.get('full-path')
except (KeyError, ElementTree.ParseError):
pass
# 如果无法从 container.xml 获取,尝试常见的路径
for common_path in ['content.opf', 'OEBPS/content.opf', 'OPS/content.opf']:
try:
zip_file.getinfo(common_path)
return common_path
except KeyError:
continue
raise FileNotFoundError("无法找到 OPF 文件")
def _parse_opf(self, opf_content):
"""解析 OPF 文件,获取阅读顺序和文件信息"""
root = ElementTree.fromstring(opf_content)
# 定义命名空间
ns = {
'opf': 'http://www.idpf.org/2007/opf',
'dc': 'http://purl.org/dc/elements/1.1/'
}
# 获取 manifest 中的所有项目
manifest_items = {}
manifest = root.find('.//opf:manifest', ns)
if manifest is not None:
for item in manifest.findall('opf:item', ns):
item_id = item.get('id')
href = item.get('href')
media_type = item.get('media-type')
manifest_items[item_id] = {
'href': href,
'media-type': media_type
}
# 获取 spine 中的阅读顺序
reading_order = []
spine = root.find('.//opf:spine', ns)
if spine is not None:
for itemref in spine.findall('opf:itemref', ns):
idref = itemref.get('idref')
if idref in manifest_items:
reading_order.append(manifest_items[idref]['href'])
return manifest_items, reading_order
def _process_html_content(self, html_content, zip_file, base_path, manifest_items):
"""处理 HTML 内容,内嵌图片和样式"""
soup = BeautifulSoup(html_content, 'html.parser')
# 处理图片
for img in soup.find_all('img'):
src = img.get('src')
if src:
# 构建完整路径
img_path = self._resolve_path(base_path, src)
try:
img_data = zip_file.read(img_path)
# 获取 MIME 类型
mime_type, _ = mimetypes.guess_type(img_path)
if mime_type:
# 转换为 base64 data URI
img_base64 = base64.b64encode(img_data).decode('utf-8')
data_uri = f"data:{mime_type};base64,{img_base64}"
img['src'] = data_uri
except KeyError:
# 如果图片不存在,保持原路径
pass
# 处理内联样式 (<style> 标签)
for style_tag in soup.find_all('style'):
if style_tag.string:
# 处理 CSS 中的 url() 引用
style_tag.string = self._process_css_urls(
style_tag.string, zip_file, base_path
)
# 处理外部样式表
for link in soup.find_all('link', {'rel': 'stylesheet'}):
href = link.get('href')
if href:
css_path = self._resolve_path(base_path, href)
try:
css_content = zip_file.read(css_path).decode('utf-8')
# 处理 CSS 中的 URL 引用
css_content = self._process_css_urls(css_content, zip_file, base_path)
# 替换 link 标签为 style 标签
style_tag = soup.new_tag('style')
style_tag.string = css_content
link.replace_with(style_tag)
except (KeyError, UnicodeDecodeError):
# 如果样式表不存在或无法解码,移除 link 标签
link.decompose()
return str(soup)
def _process_css_urls(self, css_content, zip_file, base_path):
"""处理 CSS 中的 url() 引用"""
def replace_url(match):
url = match.group(1).strip('\'"')
if url.startswith(('http://', 'https://', 'data:')):
return match.group(0) # 保持外部链接不变
try:
resource_path = self._resolve_path(base_path, url)
resource_data = zip_file.read(resource_path)
mime_type, _ = mimetypes.guess_type(resource_path)
if mime_type:
resource_base64 = base64.b64encode(resource_data).decode('utf-8')
return f'url("data:{mime_type};base64,{resource_base64}")'
except KeyError:
pass
return match.group(0) # 保持原样
# 匹配 url() 函数
return re.sub(r'url\(([^)]+)\)', replace_url, css_content)
def _resolve_path(self, base_path, relative_path):
"""解析相对路径为绝对路径"""
if relative_path.startswith('/'):
return relative_path.lstrip('/')
base_dir = os.path.dirname(base_path)
if base_dir:
return os.path.join(base_dir, relative_path).replace('\\', '/')
else:
return relative_path
def _find_html_files(self, zip_file):
"""查找 EPUB 中的所有 HTML 文件"""
html_files = []
for file_info in zip_file.filelist:
filename = file_info.filename
if filename.lower().endswith(('.html', '.htm', '.xhtml')) and not filename.startswith('META-INF/'):
html_files.append(filename)
return sorted(html_files)
# def _debug_epub_structure(self, zip_file):
"""调试 EPUB 结构,打印所有文件"""
print("=== EPUB 文件结构 ===")
for file_info in zip_file.filelist:
print(f"文件: {file_info.filename}")
print("==================")
def export(self, document: Document) -> Document:
"""
将 EPUB 文件的二进制内容转换为单个 HTML 文件。
:param document: 包含 EPUB 二进制内容的 Document 对象。
:return: 包含单个 HTML 文件内容的 Document 对象。
"""
epub_bytes = document.content
with zipfile.ZipFile(io.BytesIO(epub_bytes), 'r') as zip_file:
# 调试:打印 EPUB 结构
# self._debug_epub_structure(zip_file)
try:
# 1. 提取 OPF 文件路径
opf_path = self._extract_opf_path(zip_file)
opf_content = zip_file.read(opf_path)
# 2. 解析 OPF 文件
manifest_items, reading_order = self._parse_opf(opf_content)
# print(f"OPF 路径: {opf_path}")
# print(f"阅读顺序: {reading_order}")
# print(f"清单项目: {list(manifest_items.keys())}")
# 3. 按阅读顺序读取和处理 HTML 文件
combined_html_parts = []
base_path = os.path.dirname(opf_path)
# 尝试处理阅读顺序中的文件
processed_files = set()
for html_file in reading_order:
html_path = self._resolve_path(base_path, html_file)
# 尝试多种路径变体
possible_paths = [
html_path,
html_file, # 原始路径
html_file.replace('.html', ''), # 去掉 .html 后缀
html_file.replace('.htm.html', '.htm'), # 处理双后缀
]
file_found = False
for path_variant in possible_paths:
try:
html_content = zip_file.read(path_variant).decode('utf-8')
processed_html = self._process_html_content(
html_content, zip_file, path_variant, manifest_items
)
# 提取 body 内容(如果存在)
soup = BeautifulSoup(processed_html, 'html.parser')
body = soup.find('body')
if body:
combined_html_parts.append(str(body))
else:
combined_html_parts.append(processed_html)
processed_files.add(path_variant)
file_found = True
# print(f"成功处理文件: {path_variant}")
break
except (KeyError, UnicodeDecodeError):
continue
# if not file_found:
# print(f"警告:无法找到文件 {html_file},尝试的路径: {possible_paths}")
except Exception as e:
# print(f"解析 OPF 失败,使用备用方法: {e}")
combined_html_parts = []
processed_files = set()
# 4. 如果没有成功处理任何文件,尝试直接处理所有 HTML 文件
if not combined_html_parts:
# print("使用备用方法:处理所有发现的 HTML 文件")
html_files = self._find_html_files(zip_file)
for html_file in html_files:
if html_file in processed_files:
continue # 跳过已处理的文件
try:
html_content = zip_file.read(html_file).decode('utf-8')
processed_html = self._process_html_content(
html_content, zip_file, html_file, {}
)
# 提取 body 内容(如果存在)
soup = BeautifulSoup(processed_html, 'html.parser')
body = soup.find('body')
if body:
combined_html_parts.append(str(body))
else:
combined_html_parts.append(processed_html)
# print(f"备用方法成功处理: {html_file}")
except (KeyError, UnicodeDecodeError) as e:
# print(f"备用方法处理失败 {html_file}: {e}")
continue
# 5. 组合成完整的 HTML 文档
if combined_html_parts:
# 创建基本的 HTML 结构
html_content = f"""<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{document.stem}</title>
<style>
body {{
max-width: 800px;
margin: 0 auto;
padding: 20px;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
line-height: 1.6;
color: #333;
}}
img {{
max-width: 100%;
height: auto;
}}
.chapter {{
margin-bottom: 2em;
page-break-after: always;
}}
pre {{
white-space: pre-wrap;
word-wrap: break-word;
}}
</style>
</head>
<body>
<div class="epub-content">
{''.join(f'<div class="chapter">{part}</div>' for part in combined_html_parts)}
</div>
</body>
</html>"""
# print(f"成功组合 {len(combined_html_parts)} 个部分的内容")
else:
html_content = f"""<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>{document.stem}</title>
</head>
<body>
<h1>错误:无法提取 EPUB 内容</h1>
<p>未能找到有效的 HTML 内容文件。</p>
<p>请检查 EPUB 文件格式是否正确。</p>
</body>
</html>"""
# print("警告:没有找到任何有效的 HTML 内容")
return Document.from_bytes(content=html_content.encode("utf-8"), suffix=".html", stem=document.stem)
if __name__ == '__main__':
from pathlib import Path
doc_original = Document.from_path(r"C:\Users\jxgm\Downloads\pg6593-images.epub")
html_exp = Epub2HTMLExporter().export(doc_original)
Path(r"C:\Users\jxgm\Desktop\translate\docutranslate\tests\output\output.html").write_bytes(html_exp.content)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,200 @@
import asyncio
import os
import zipfile
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from io import BytesIO
from typing import Self, Literal, List, Dict, Any
from bs4 import BeautifulSoup
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig
from docutranslate.translator.base import Translator
@dataclass
class EpubTranslatorConfig(AiTranslatorConfig):
insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n"
class EpubTranslator(Translator):
"""
一个用于翻译 EPUB 文件中内容的翻译器。
此版本使用内置的 `zipfile` 和 `xml` 库,不依赖 `ebooklib`。
"""
def __init__(self, config: EpubTranslatorConfig):
super().__init__(config=config)
self.chunk_size = config.chunk_size
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt, to_lang=config.to_lang,
baseurl=config.base_url, key=config.api_key, model_id=config.model_id,
system_prompt=None, temperature=config.temperature, thinking=config.thinking,
max_concurrent=config.concurrent, timeout=config.timeout, logger=self.logger
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode
self.separator = config.separator
def _pre_translate(self, document: Document) -> tuple[
Dict[str, bytes], List[Dict[str, Any]], List[str]
]:
"""
预处理 EPUB 文件,提取所有需要翻译的文本。
"""
all_files = {}
items_to_translate = []
original_texts = []
# --- 步骤 1: 使用 zipfile 读取 EPUB 内容到内存 ---
with zipfile.ZipFile(BytesIO(document.content), 'r') as zf:
for filename in zf.namelist():
all_files[filename] = zf.read(filename)
# --- 步骤 2: 解析元数据以找到内容文件 ---
# 2.1: 解析 container.xml 找到 .opf 文件的路径
container_xml = all_files.get('META-INF/container.xml')
if not container_xml:
raise ValueError("无效的 EPUB找不到 META-INF/container.xml")
root = ET.fromstring(container_xml)
# XML 命名空间,解析时必须使用
ns = {'cn': 'urn:oasis:names:tc:opendocument:xmlns:container'}
opf_path = root.find('cn:rootfiles/cn:rootfile', ns).get('full-path')
opf_dir = os.path.dirname(opf_path)
# 2.2: 解析 .opf 文件找到 manifest 和 spine
opf_xml = all_files.get(opf_path)
if not opf_xml:
raise ValueError(f"无效的 EPUB找不到 {opf_path}")
opf_root = ET.fromstring(opf_xml)
ns_opf = {'opf': 'http://www.idpf.org/2007/opf'}
manifest_items = {}
for item in opf_root.findall('opf:manifest/opf:item', ns_opf):
item_id = item.get('id')
href = item.get('href')
# 路径需要相对于 .opf 文件的位置
full_href = os.path.join(opf_dir, href).replace('\\', '/')
manifest_items[item_id] = {'href': full_href, 'media_type': item.get('media-type')}
spine_itemrefs = [item.get('idref') for item in opf_root.findall('opf:spine/opf:itemref', ns_opf)]
# --- 步骤 3: 提取可翻译内容 ---
# 我们这里简单地翻译 manifest 中所有的 xhtml/html 文件
for item_id, item_data in manifest_items.items():
media_type = item_data['media_type']
if media_type in ['application/xhtml+xml', 'text/html']:
file_path = item_data['href']
content_bytes = all_files.get(file_path)
if not content_bytes:
self.logger.warning(f"在 EPUB 中找不到文件: {file_path}")
continue
soup = BeautifulSoup(content_bytes, "html.parser")
for text_node in soup.find_all(string=True):
if (
text_node.parent.name not in ['style', 'script', 'head', 'title', 'meta', '[document]']
and not text_node.isspace()
):
text = text_node.get_text(strip=True)
if text:
item_info = {
"file_path": file_path,
"text_node": text_node,
"original_text": text,
}
items_to_translate.append(item_info)
original_texts.append(text)
return all_files, items_to_translate, original_texts
def _after_translate(
self,
all_files: Dict[str, bytes],
items_to_translate: List[Dict[str, Any]],
translated_texts: List[str],
original_texts: List[str],
) -> bytes:
"""
将翻译后的文本写回,并重新打包成 EPUB 文件。
"""
modified_soups = {} # 缓存每个文件的 soup 对象
for i, item_info in enumerate(items_to_translate):
file_path = item_info["file_path"]
text_node = item_info["text_node"]
translated_text = translated_texts[i]
original_text = original_texts[i]
# 获取或创建该文件的 soup 对象
if file_path not in modified_soups:
# 找到该节点所属的根 soup 对象
modified_soups[file_path] = text_node.find_parent('html')
if self.insert_mode == "replace":
new_text = translated_text
elif self.insert_mode == "append":
new_text = original_text + self.separator + translated_text
elif self.insert_mode == "prepend":
new_text = translated_text + self.separator + original_text
else:
new_text = translated_text
text_node.replace_with(new_text)
# 将修改后的 soup 对象转换回字节串
for file_path, soup in modified_soups.items():
all_files[file_path] = str(soup).encode('utf-8')
# --- 步骤 4: 创建新的 EPUB (ZIP) 文件 ---
output_buffer = BytesIO()
with zipfile.ZipFile(output_buffer, 'w') as zf_out:
# 关键mimetype 必须是第一个文件且不能压缩
if 'mimetype' in all_files:
zf_out.writestr('mimetype', all_files['mimetype'], compress_type=zipfile.ZIP_STORED)
# 写入其他所有文件
for filename, content in all_files.items():
if filename != 'mimetype':
zf_out.writestr(filename, content, compress_type=zipfile.ZIP_DEFLATED)
return output_buffer.getvalue()
def translate(self, document: Document) -> Self:
"""
同步翻译 EPUB 文档。
"""
all_files, items_to_translate, original_texts = self._pre_translate(document)
if not items_to_translate:
self.logger.info("\n文件中没有找到需要翻译的纯文本内容。")
return self
translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size)
document.content = self._after_translate(
all_files, items_to_translate, translated_texts, original_texts
)
return self
async def translate_async(self, document: Document) -> Self:
"""
异步翻译 EPUB 文档。
"""
all_files, items_to_translate, original_texts = await asyncio.to_thread(
self._pre_translate, document
)
if not items_to_translate:
self.logger.info("\n文件中没有找到需要翻译的纯文本内容。")
return self
translated_texts = await self.translate_agent.send_segments_async(
original_texts, self.chunk_size
)
document.content = await asyncio.to_thread(
self._after_translate, all_files, items_to_translate, translated_texts, original_texts
)
return self

View File

@@ -0,0 +1,66 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Self
from docutranslate.exporter.base import ExporterConfig
from docutranslate.exporter.epub.epub2epub_exporter import Epub2EpubExporter
from docutranslate.exporter.epub.epub2html_exporter import Epub2HTMLExporterConfig, Epub2HTMLExporter
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.epub_translator import EpubTranslatorConfig, EpubTranslator
from docutranslate.workflow.base import Workflow, WorkflowConfig
from docutranslate.workflow.interfaces import HTMLExportable, EpubExportable
@dataclass(kw_only=True)
class EpubWorkflowConfig(WorkflowConfig):
translator_config: EpubTranslatorConfig
html_exporter_config: Epub2HTMLExporterConfig
class EpubWorkflow(Workflow[EpubWorkflowConfig, Document, Document], HTMLExportable[Epub2HTMLExporterConfig],
EpubExportable[ExporterConfig]):
def __init__(self, config: EpubWorkflowConfig):
super().__init__(config=config)
if config.logger:
for sub_config in [self.config.translator_config]:
if sub_config:
sub_config.logger = config.logger
def _pre_translate(self, document_original: Document):
document = document_original.copy()
translate_config = self.config.translator_config
translator = EpubTranslator(translate_config)
return document, translator
def translate(self) -> Self:
document, translator = self._pre_translate(self.document_original)
translator.translate(document)
self.document_translated = document
return self
async def translate_async(self) -> Self:
document, translator = self._pre_translate(self.document_original)
await translator.translate_async(document)
self.document_translated = document
return self
def export_to_html(self, config: Epub2HTMLExporterConfig = None) -> str:
config = config or self.config.html_exporter_config
docu = self._export(Epub2HTMLExporter(config))
return docu.content.decode()
def export_to_epub(self, _: ExporterConfig | None = None) -> bytes:
docu = self._export(Epub2EpubExporter())
return docu.content
def save_as_html(self, name: str = None, output_dir: Path | str = "./output",
config: Epub2HTMLExporter | None = None) -> Self:
config = config or self.config.html_exporter_config
self._save(exporter=Epub2HTMLExporter(config), name=name, output_dir=output_dir)
return self
def save_as_epub(self, name: str = None, output_dir: Path | str = "./output",
_: ExporterConfig | None = None) -> Self:
self._save(exporter=Epub2EpubExporter(), name=name, output_dir=output_dir)
return self

View File

@@ -79,3 +79,11 @@ class SrtExportable(Protocol[T_ExporterConfig]):
def save_as_srt(self, name: str, output_dir: Path | str, config: T_ExporterConfig | None = None) -> Self:
...
@runtime_checkable
class EpubExportable(Protocol[T_ExporterConfig]):
def export_to_epub(self, config: T_ExporterConfig | None = None) -> bytes:
...
def save_as_epub(self, name: str, output_dir: Path | str, config: T_ExporterConfig | None = None) -> Self:
...

View File

@@ -13,6 +13,7 @@ dependencies = [
"json-repair>=0.48.0",
"mammoth>=1.10.0",
"srt>=3.5.3",
"lxml>=5.4.0",
]
dynamic = ["version"]

2
uv.lock generated
View File

@@ -319,6 +319,7 @@ dependencies = [
{ name = "httpx" },
{ name = "json-repair" },
{ name = "jsonpath-ng" },
{ name = "lxml" },
{ name = "mammoth" },
{ name = "markdown2" },
{ name = "openpyxl" },
@@ -346,6 +347,7 @@ requires-dist = [
{ name = "httpx", specifier = "==0.27.2" },
{ name = "json-repair", specifier = ">=0.48.0" },
{ name = "jsonpath-ng", specifier = ">=1.7.0" },
{ name = "lxml", specifier = ">=5.4.0" },
{ name = "mammoth", specifier = ">=1.10.0" },
{ name = "markdown2", specifier = ">=2.5.3" },
{ name = "opencv-python", marker = "extra == 'docling'", specifier = ">=4.11.0.86" },