增加srt翻译支持

This commit is contained in:
xunbu
2025-08-08 19:13:59 +08:00
parent a7d1a17166
commit 8e192e63e4
12 changed files with 363 additions and 9 deletions

View File

@@ -30,9 +30,10 @@ from docutranslate.workflow.base import Workflow
from docutranslate.workflow.docx_workflow import DocxWorkflow, DocxWorkflowConfig
from docutranslate.workflow.interfaces import DocxExportable
from docutranslate.workflow.interfaces import HTMLExportable, MDFormatsExportable, TXTExportable, JsonExportable, \
XlsxExportable
XlsxExportable, SrtExportable
from docutranslate.workflow.json_workflow import JsonWorkflow, JsonWorkflowConfig
from docutranslate.workflow.md_based_workflow import MarkdownBasedWorkflow, MarkdownBasedWorkflowConfig
from docutranslate.workflow.srt_workflow import SrtWorkflow, SrtWorkflowConfig
from docutranslate.workflow.txt_workflow import TXTWorkflow, TXTWorkflowConfig
from docutranslate.workflow.xlsx_workflow import XlsxWorkflow, XlsxWorkflowConfig
@@ -49,6 +50,8 @@ from docutranslate.translator.ai_translator.xlsx_translator import XlsxTranslato
from docutranslate.exporter.xlsx.xlsx2html_exporter import Xlsx2HTMLExporterConfig
from docutranslate.translator.ai_translator.docx_translator import DocxTranslatorConfig
from docutranslate.exporter.docx.docx2html_exporter import Docx2HTMLExporterConfig
from docutranslate.translator.ai_translator.srt_translator import SrtTranslatorConfig
from docutranslate.exporter.srt.srt2html_exporter import Srt2HTMLExporterConfig
# ------------------------------------
from docutranslate.logger import global_logger
@@ -69,6 +72,7 @@ WORKFLOW_DICT: Dict[str, Type[Workflow]] = {
"json": JsonWorkflow,
"xlsx": XlsxWorkflow,
"docx": DocxWorkflow,
"srt": SrtWorkflow,
}
@@ -251,9 +255,22 @@ class DocxWorkflowParams(BaseWorkflowParams):
)
class SrtWorkflowParams(BaseWorkflowParams):
workflow_type: Literal['srt'] = Field(..., description="指定使用SRT字幕的翻译工作流。")
insert_mode: Literal["replace", "append", "prepend"] = Field(
"replace",
description="翻译文本的插入模式。'replace':替换原文,'append':附加到原文后,'prepend':附加到原文前。"
)
separator: str = Field(
"\n",
description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。"
)
# 3. 使用可辨识联合类型Discriminated Union将它们组合起来
TranslatePayload = Annotated[
Union[MarkdownWorkflowParams, TextWorkflowParams, JsonWorkflowParams, XlsxWorkflowParams, DocxWorkflowParams],
Union[
MarkdownWorkflowParams, TextWorkflowParams, JsonWorkflowParams, XlsxWorkflowParams, DocxWorkflowParams, SrtWorkflowParams],
Field(discriminator='workflow_type')
]
@@ -261,7 +278,7 @@ TranslatePayload = Annotated[
# 4. 创建最终的请求体模型
class TranslateServiceRequest(BaseModel):
file_name: str = Field(..., description="上传的原始文件名,含扩展名。",
examples=["my_paper.pdf", "chapter1.txt", "data.xlsx"])
examples=["my_paper.pdf", "chapter1.txt", "data.xlsx", "video.srt"])
file_content: str = Field(..., description="Base64编码的文件内容。", examples=["JVBERi0xLjQK..."])
payload: TranslatePayload = Field(..., description="包含工作流类型和相应参数的载荷。")
@@ -331,6 +348,21 @@ class TranslateServiceRequest(BaseModel):
"insert_mode": "replace",
}
}
},
{
"summary": "SRT 工作流示例",
"value": {
"file_name": "movie.srt",
"file_content": "MSAKMDA6MDA6MDEsMjAwIC0tPiAwMDowMD...",
"payload": {
"workflow_type": "srt",
"base_url": "https://api.openai.com/v1",
"api_key": "sk-your-api-key-here",
"model_id": "gpt-4o",
"to_lang": "English",
"insert_mode": "replace",
}
}
}
]
}
@@ -455,6 +487,23 @@ async def _perform_translation(
)
workflow = DocxWorkflow(config=workflow_config)
elif isinstance(payload, SrtWorkflowParams):
task_logger.info("构建 SrtWorkflow 配置。")
translator_config = SrtTranslatorConfig(
**payload.model_dump(include={
'base_url', 'api_key', 'model_id', 'to_lang', 'custom_prompt',
'temperature', 'thinking', 'chunk_size', 'concurrent',
'insert_mode', 'separator'
}, exclude_none=True)
)
html_exporter_config = Srt2HTMLExporterConfig(cdn=True)
workflow_config = SrtWorkflowConfig(
translator_config=translator_config,
html_exporter_config=html_exporter_config,
logger=task_logger
)
workflow = SrtWorkflow(config=workflow_config)
else:
raise TypeError(f"工作流类型 '{payload.workflow_type}' 的处理逻辑未实现。")
@@ -582,7 +631,7 @@ def _cancel_translation_logic(task_id: str):
description="""
接收一个包含文件内容Base64编码和工作流参数的JSON请求启动一个后台翻译任务。
- **工作流选择**: 请求体中的 `payload.workflow_type` 字段决定了本次任务的类型(如 `markdown_based`, `txt`, `json`, `xlsx`, `docx`)。
- **工作流选择**: 请求体中的 `payload.workflow_type` 字段决定了本次任务的类型(如 `markdown_based`, `txt`, `json`, `xlsx`, `docx`, `srt`)。
- **动态参数**: 根据所选工作流API需要不同的参数集。请参考下面的Schema或示例。
- **异步处理**: 此端点会立即返回任务ID客户端需轮询状态接口获取进度。
""",
@@ -744,6 +793,8 @@ async def service_get_status(
downloads["xlsx"] = f"/service/download/{task_id}/xlsx"
if isinstance(workflow, DocxExportable):
downloads["docx"] = f"/service/download/{task_id}/docx"
if isinstance(workflow, SrtExportable):
downloads["srt"] = f"/service/download/{task_id}/srt"
return JSONResponse(content={
"task_id": task_id,
@@ -778,7 +829,7 @@ async def service_get_logs(task_id: str):
return JSONResponse(content={"logs": new_logs})
FileType = Literal["markdown", "markdown_zip", "html", "txt", "json", "xlsx", "docx"]
FileType = Literal["markdown", "markdown_zip", "html", "txt", "json", "xlsx", "docx", "srt"]
async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple[bytes, str, str]:
@@ -817,6 +868,8 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
html_config = Xlsx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, DocxWorkflow):
html_config = Docx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, SrtWorkflow):
html_config = Srt2HTMLExporterConfig(cdn=is_cdn_available)
if file_type == 'html' and isinstance(workflow, HTMLExportable):
content_str = await asyncio.to_thread(workflow.export_to_html, html_config)
@@ -843,6 +896,10 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
elif file_type == 'docx' and isinstance(workflow, DocxExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_docx)
media_type, filename = "application/vnd.openxmlformats-officedocument.wordprocessingml.document", f"{filename_stem}_translated.docx"
elif file_type == 'srt' and isinstance(workflow, SrtExportable):
content_str = await asyncio.to_thread(workflow.export_to_srt)
content_bytes, media_type, filename = content_str.encode(
'utf-8'), "text/plain; charset=utf-8", f"{filename_stem}_translated.srt"
else:
raise HTTPException(status_code=404, detail=f"此任务不支持导出 '{file_type}' 类型的文件。")
@@ -861,6 +918,7 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
"content": {
"text/html; charset=utf-8": {"schema": {"type": "string"}},
"text/markdown; charset=utf-8": {"schema": {"type": "string"}},
"text/plain; charset=utf-8": {"schema": {"type": "string"}},
"application/zip": {"schema": {"type": "string", "format": "binary"}},
"application/json": {"schema": {"type": "string", "format": "binary"}},
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": {
@@ -875,7 +933,7 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
)
async def service_download_file(
task_id: str = FastApiPath(..., description="已完成任务的ID", examples=["b2865b93"]),
file_type: FileType = FastApiPath(..., description="要下载的文件类型。", examples=["html", "json", "docx"])
file_type: FileType = FastApiPath(..., description="要下载的文件类型。", examples=["html", "json", "docx", "srt"])
):
content, media_type, filename = await _get_content_from_workflow(task_id, file_type)
headers = {"Content-Disposition": f"attachment; filename*=UTF-8''{quote(filename, safe='', encoding='utf-8')}"}
@@ -919,7 +977,8 @@ async def service_download_file(
)
async def service_content(
task_id: str = FastApiPath(..., description="已完成任务的ID", examples=["b2865b93"]),
file_type: FileType = FastApiPath(..., description="要获取内容的文件类型。", examples=["html", "json", "docx"])
file_type: FileType = FastApiPath(..., description="要获取内容的文件类型。",
examples=["html", "json", "docx", "srt"])
):
content, _, filename = await _get_content_from_workflow(task_id, file_type)
@@ -1053,4 +1112,4 @@ def run_app(port: int | None = None):
if __name__ == "__main__":
run_app()
run_app()

View File

View File

@@ -0,0 +1,8 @@
from docutranslate.exporter.base import Exporter
from docutranslate.ir.document import Document
#TODO:看情况是否需要为TXT单独写一个document类型
class SrtExporter(Exporter[Document]):
def export(self,document:Document)->Document:
...

View File

@@ -0,0 +1,40 @@
from dataclasses import dataclass
import jinja2
import srt
from docutranslate.exporter.base import ExporterConfig
from docutranslate.exporter.srt.base import SrtExporter
from docutranslate.ir.document import Document
from docutranslate.utils.resource_utils import resource_path
@dataclass
class Srt2HTMLExporterConfig(ExporterConfig):
cdn: bool = True
class Srt2HTMLExporter(SrtExporter):
def __init__(self, config: Srt2HTMLExporterConfig = None):
config = config or Srt2HTMLExporterConfig()
super().__init__(config=config)
self.cdn = config.cdn
def export(self, document: Document) -> Document:
cdn = self.cdn
srt_string=document.content.decode("utf-8")
subs = list(srt.parse(srt_string))
for sub in subs:
sub.content = sub.content.replace('\n', '<br>')
html_template = resource_path("template/srt.html").read_text(encoding="utf-8")
# language=html
pico = f'<style>{resource_path("static/pico.css").read_text(encoding="utf-8")}</style>' if not cdn else r'<link rel="stylesheet" href="https://s4.zstatic.net/ajax/libs/picocss/2.1.1/pico.min.css" integrity="sha512-+4kjFgVD0n6H3xt19Ox84B56MoS7srFn60tgdWFuO4hemtjhySKyW4LnftYZn46k3THUEiTTsbVjrHai+0MOFw==" crossorigin="anonymous" referrerpolicy="no-referrer" />'
render = jinja2.Template(html_template).render(
title=document.stem,
pico=pico,
subtitles=subs
)
return Document.from_bytes(content=render.encode("utf-8"), suffix=".html", stem=document.stem)

View File

@@ -0,0 +1,7 @@
from docutranslate.exporter.srt.base import SrtExporter
from docutranslate.ir.document import Document
class Srt2SrtExporter(SrtExporter):
def export(self, document: Document) -> Document:
return document.copy()

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,20 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>{{ tittle }}</title>
</head>
<body>
{% for sub in subtitles %}
<p>
<b>{{ sub.index }}</b><br>
{{ sub.start }} --> {{ sub.end }}<br>
{{ sub.content | safe }}
</p>
{% else %}
<p>没有字幕内容可显示。</p>
{% endfor %}
</body>
</html>

View File

@@ -0,0 +1,137 @@
import asyncio
from dataclasses import dataclass
from typing import Self, Literal
import srt # 导入srt库来处理字幕文件
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig
from docutranslate.translator.base import Translator
@dataclass
class SrtTranslatorConfig(AiTranslatorConfig):
insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n"
class SrtTranslator(Translator):
"""
一个用于翻译 SRT (.srt) 字幕文件的翻译器。
它会提取每个字幕块的文本内容,进行翻译,然后根据配置将译文写回。
"""
def __init__(self, config: SrtTranslatorConfig):
super().__init__(config=config)
self.chunk_size = config.chunk_size
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
model_id=config.model_id,
system_prompt=None,
temperature=config.temperature,
thinking=config.thinking,
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode
self.separator = config.separator
def _pre_translate(self, document: Document):
"""
预处理步骤解析SRT文件提取所有字幕文本。
Returns:
tuple: (解析后的字幕对象列表, 待翻译的原文文本列表)
"""
try:
# 使用 utf-8-sig 解码以处理可能存在的BOM (Byte Order Mark)
srt_content = document.content.decode('utf-8-sig')
except (UnicodeDecodeError, AttributeError) as e:
self.logger.error(f"无法解码SRT文件内容请确保文件编码为UTF-8: {e}")
return [], []
# 使用 srt 库解析内容
try:
subtitles = list(srt.parse(srt_content))
except srt.SRTParseError as e:
self.logger.error(f"解析SRT文件失败: {e}")
return [], []
# 提取所有原文文本,准备进行批量翻译
original_texts = [sub.content for sub in subtitles]
return subtitles, original_texts
def _after_translate(self, subtitles: list[srt.Subtitle], translated_texts: list[str],
original_texts: list[str]) -> bytes:
"""
翻译后处理步骤将译文根据配置模式写回字幕对象并生成新的SRT文件内容。
Returns:
bytes: 新的SRT文件内容的字节流。
"""
for i, sub in enumerate(subtitles):
translated_text = translated_texts[i]
original_text = original_texts[i]
# 根据插入模式更新字幕内容
if self.insert_mode == "replace":
sub.content = translated_text
elif self.insert_mode == "append":
# strip() 避免在原文和译文间产生多余的空白
sub.content = original_text.strip() + self.separator + translated_text.strip()
elif self.insert_mode == "prepend":
sub.content = translated_text.strip() + self.separator + original_text.strip()
else:
self.logger.error(f"不正确的SrtTranslatorConfig参数: insert_mode='{self.insert_mode}'")
# 默认回退到替换模式,避免程序中断
sub.content = translated_text
# 使用 srt 库将修改后的字幕对象列表重新合成为SRT格式的字符串
new_srt_content_str = srt.compose(subtitles)
# 返回UTF-8编码的字节流
return new_srt_content_str.encode('utf-8')
def translate(self, document: Document) -> Self:
"""
同步翻译SRT文档。
"""
subtitles, original_texts = self._pre_translate(document)
if not original_texts:
self.logger.info("\n文件中没有找到需要翻译的字幕内容。")
return self
# --- 步骤 2: 调用翻译Agent ---
translated_texts = self.translate_agent.send_segments(original_texts, self.chunk_size)
# --- 步骤 3: 后处理并更新文档内容 ---
document.content = self._after_translate(subtitles, translated_texts, original_texts)
return self
async def translate_async(self, document: Document) -> Self:
"""
异步翻译SRT文档。
"""
# I/O密集型操作在线程中运行
subtitles, original_texts = await asyncio.to_thread(self._pre_translate, document)
if not original_texts:
self.logger.info("\n文件中没有找到需要翻译的字幕内容。")
return self
# --- 步骤 2: 调用翻译Agent (异步) ---
translated_texts = await self.translate_agent.send_segments_async(original_texts, self.chunk_size)
# --- 步骤 3: 后处理并更新文档内容 (I/O密集型) ---
document.content = await asyncio.to_thread(
self._after_translate, subtitles, translated_texts, original_texts
)
return self

View File

@@ -70,4 +70,12 @@ class DocxExportable(Protocol[T_ExporterConfig]):
...
def save_as_docx(self, name: str, output_dir: Path | str, config: T_ExporterConfig | None = None) -> Self:
...
@runtime_checkable
class SrtExportable(Protocol[T_ExporterConfig]):
def export_to_srt(self, config: T_ExporterConfig | None = None) -> str:
...
def save_as_srt(self, name: str, output_dir: Path | str, config: T_ExporterConfig | None = None) -> Self:
...

View File

@@ -0,0 +1,66 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Self
from docutranslate.exporter.base import ExporterConfig
from docutranslate.exporter.srt.srt2html_exporter import Srt2HTMLExporterConfig, Srt2HTMLExporter
from docutranslate.exporter.srt.srt2srt_exporter import Srt2SrtExporter
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.srt_translator import SrtTranslatorConfig, SrtTranslator
from docutranslate.workflow.base import Workflow, WorkflowConfig
from docutranslate.workflow.interfaces import HTMLExportable, SrtExportable
@dataclass(kw_only=True)
class SrtWorkflowConfig(WorkflowConfig):
translator_config: SrtTranslatorConfig
html_exporter_config: Srt2HTMLExporterConfig
class SrtWorkflow(Workflow[SrtWorkflowConfig, Document, Document], HTMLExportable[Srt2HTMLExporterConfig],
SrtExportable[ExporterConfig]):
def __init__(self, config: SrtWorkflowConfig):
super().__init__(config=config)
if config.logger:
for sub_config in [self.config.translator_config]:
if sub_config:
sub_config.logger = config.logger
def _pre_translate(self,document_original:Document):
document = document_original.copy()
translate_config = self.config.translator_config
translator = SrtTranslator(translate_config)
return document,translator
def translate(self) -> Self:
document, translator=self._pre_translate(self.document_original)
translator.translate(document)
self.document_translated = document
return self
async def translate_async(self) -> Self:
document, translator = self._pre_translate(self.document_original)
await translator.translate_async(document)
self.document_translated = document
return self
def export_to_html(self, config: Srt2HTMLExporterConfig = None) -> str:
config = config or self.config.html_exporter_config
docu = self._export(Srt2HTMLExporter(config))
return docu.content.decode()
def export_to_srt(self, _: ExporterConfig | None = None) -> str:
docu = self._export(Srt2SrtExporter())
return docu.content.decode()
def save_as_html(self, name: str = None, output_dir: Path | str = "./output",
config: Srt2HTMLExporterConfig | None = None) -> Self:
config = config or self.config.html_exporter_config
self._save(exporter=Srt2HTMLExporter(config), name=name, output_dir=output_dir)
return self
def save_as_srt(self, name: str = None, output_dir: Path | str = "./output",
_: ExporterConfig | None = None) -> Self:
self._save(exporter=Srt2SrtExporter(), name=name, output_dir=output_dir)
return self