后端提前生成临时文件

This commit is contained in:
xunbu
2025-08-24 10:44:04 +08:00
parent 44c5408c6f
commit f1ccc0f307
2 changed files with 175 additions and 130 deletions

View File

@@ -4,7 +4,9 @@ import binascii
import io
import logging
import os
import shutil
import socket
import tempfile
import time
import uuid
from contextlib import asynccontextmanager, closing
@@ -85,6 +87,20 @@ WORKFLOW_DICT: Dict[str, Type[Workflow]] = {
"html": HtmlWorkflow,
}
# --- 媒体类型映射 ---
MEDIA_TYPES = {
"html": "text/html; charset=utf-8",
"markdown": "text/markdown; charset=utf-8",
"markdown_zip": "application/zip",
"txt": "text/plain; charset=utf-8",
"json": "application/json; charset=utf-8",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"csv": "text/csv; charset=utf-8",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"srt": "text/plain; charset=utf-8",
"epub": "application/epub+zip",
}
# --- 辅助函数 ---
def _create_default_task_state() -> Dict[str, Any]:
@@ -92,10 +108,12 @@ def _create_default_task_state() -> Dict[str, Any]:
return {
"is_processing": False, "status_message": "空闲", "error_flag": False,
"download_ready": False,
"workflow_instance": None,
"workflow_instance": None, # 仅在处理期间使用
"original_filename_stem": None, "task_start_time": 0,
"task_end_time": 0, "current_task_ref": None,
"original_filename": None,
"temp_dir": None, # 用于存储临时文件的目录
"downloadable_files": {}, # 存储可下载文件的路径和名称
}
@@ -140,6 +158,15 @@ async def lifespan(app: FastAPI):
global_logger.setLevel(logging.INFO)
print("应用启动完成,多任务状态已初始化。")
yield
# 清理任何可能残留的临时目录
for task_id, task_state in tasks_state.items():
temp_dir = task_state.get("temp_dir")
if temp_dir and os.path.isdir(temp_dir):
try:
shutil.rmtree(temp_dir)
print(f"应用关闭,清理任务 '{task_id}' 的临时目录: {temp_dir}")
except Exception as e:
print(f"清理任务 '{task_id}' 的临时目录 '{temp_dir}' 时出错: {e}")
await httpx_client.aclose()
print("应用关闭,资源已清理。")
@@ -178,7 +205,7 @@ DocuTranslate 后端服务 API提供文档翻译、状态查询、结果下
4. **`GET /service/download/{{task_id}}/{{file_type}}`**: 任务完成后 (当 `download_ready` 为 `true` 时),通过此端点下载结果文件。
5. **`GET /service/content/{{task_id}}/{{file_type}}`**: 任务完成后(当 `download_ready` 为 `true` 时)以JSON格式获取文件内容。
6. **`POST /service/cancel/{{task_id}}`**: (可选) 取消一个正在进行的任务。
7. **`POST /service/release/{{task_id}}`**: (可选) 当任务不再需要时,释放其在服务器上占用的所有资源。
7. **`POST /service/release/{{task_id}}`**: (可选) 当任务不再需要时,释放其在服务器上占用的所有资源,包括临时文件
**版本**: {__version__}
""",
@@ -305,6 +332,8 @@ class HtmlWorkflowParams(BaseWorkflowParams):
" ",
description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。"
)
# --- HTML WORKFLOW PARAMS END ---
@@ -464,6 +493,7 @@ async def _perform_translation(
task_logger.info(f"后台翻译任务开始: 文件 '{original_filename}', 工作流: '{payload.workflow_type}'")
task_state["status_message"] = f"正在处理 '{original_filename}'..."
temp_dir = None
try:
# 1. 根据工作流类型选择合适的 Workflow Class
@@ -623,17 +653,86 @@ async def _perform_translation(
workflow.read_bytes(content=file_contents, stem=file_stem, suffix=file_suffix)
await workflow.translate_async()
# 4. 任务成功,存储 workflow 实例并更新状态
# 4. 任务成功,生成所有可下载文件并存储
task_logger.info("翻译完成,正在生成结果文件...")
temp_dir = tempfile.mkdtemp(prefix=f"docutranslate_{task_id}_")
task_state["temp_dir"] = temp_dir
downloadable_files = {}
filename_stem = task_state['original_filename_stem']
# 检查CDN可用性
is_cdn_available = True
try:
await httpx_client.head("https://s4.zstatic.net/ajax/libs/KaTeX/0.16.9/contrib/auto-render.min.js",
timeout=3)
except (httpx.TimeoutException, httpx.RequestError):
is_cdn_available = False
task_logger.warning("CDN连接失败将使用本地JS进行渲染。")
# 定义导出函数映射
export_map = {}
# 根据 workflow 的类型填充导出映射
if isinstance(workflow, HTMLExportable):
html_config = None
if isinstance(workflow, MarkdownBasedWorkflow):
html_config = MD2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, TXTWorkflow):
html_config = TXT2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, JsonWorkflow):
html_config = Json2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, XlsxWorkflow):
html_config = Xlsx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, DocxWorkflow):
html_config = Docx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, SrtWorkflow):
html_config = Srt2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, EpubWorkflow):
html_config = Epub2HTMLExporterConfig(cdn=is_cdn_available)
export_map['html'] = (lambda: workflow.export_to_html(html_config), f"{filename_stem}_translated.html", True)
if isinstance(workflow, MDFormatsExportable):
export_map['markdown'] = (workflow.export_to_markdown, f"{filename_stem}_translated.md", True)
export_map['markdown_zip'] = (workflow.export_to_markdown_zip, f"{filename_stem}_translated.zip", False)
if isinstance(workflow, TXTExportable):
export_map['txt'] = (workflow.export_to_txt, f"{filename_stem}_translated.txt", True)
if isinstance(workflow, JsonExportable):
export_map['json'] = (workflow.export_to_json, f"{filename_stem}_translated.json", True)
if isinstance(workflow, XlsxExportable):
export_map['xlsx'] = (workflow.export_to_xlsx, f"{filename_stem}_translated.xlsx", False)
if isinstance(workflow, CsvExportable):
export_map['csv'] = (workflow.export_to_csv, f"{filename_stem}_translated.csv", False)
if isinstance(workflow, DocxExportable):
export_map['docx'] = (workflow.export_to_docx, f"{filename_stem}_translated.docx", False)
if isinstance(workflow, SrtExportable):
export_map['srt'] = (workflow.export_to_srt, f"{filename_stem}_translated.srt", True)
if isinstance(workflow, EpubExportable):
export_map['epub'] = (workflow.export_to_epub, f"{filename_stem}_translated.epub", False)
# 循环生成文件
for file_type, (export_func, filename, is_string_output) in export_map.items():
try:
task_logger.info(f"正在生成 {file_type} 文件: {filename}")
content = await asyncio.to_thread(export_func)
content_bytes = content.encode('utf-8') if is_string_output else content
file_path = os.path.join(temp_dir, filename)
with open(file_path, "wb") as f:
f.write(content_bytes)
downloadable_files[file_type] = {"path": file_path, "filename": filename}
task_logger.info(f"成功生成 {file_type} 文件于: {file_path}")
except Exception as export_error:
task_logger.error(f"生成 {file_type} 文件时出错: {export_error}", exc_info=True)
# 5. 任务成功,更新最终状态
end_time = time.time()
duration = end_time - task_state["task_start_time"]
task_state.update({
"workflow_instance": workflow,
"status_message": f"翻译成功!用时 {duration:.2f} 秒。",
"download_ready": True,
"error_flag": False,
"task_end_time": end_time,
"downloadable_files": downloadable_files,
})
task_logger.info(f"翻译成功完成,用时 {duration:.2f} 秒。")
task_logger.info(f"翻译成功完成,用时 {duration:.2f} 秒。所有结果文件已生成。")
except asyncio.CancelledError:
end_time = time.time()
@@ -641,7 +740,7 @@ async def _perform_translation(
task_logger.info(f"翻译任务 '{original_filename}' 已被取消 (用时 {duration:.2f} 秒).")
task_state.update({
"status_message": f"翻译任务已取消 (用时 {duration:.2f} 秒).", "error_flag": False, "download_ready": False,
"workflow_instance": None, "task_end_time": end_time,
"task_end_time": end_time,
})
except Exception as e:
end_time = time.time()
@@ -651,11 +750,19 @@ async def _perform_translation(
task_state.update({
"status_message": f"翻译过程中发生错误 (用时 {duration:.2f} 秒): {e}", "error_flag": True,
"download_ready": False,
"workflow_instance": None, "task_end_time": end_time,
"task_end_time": end_time,
})
finally:
# 无论成功失败,都清理内存中的 workflow 实例和临时目录(如果失败)
task_state["workflow_instance"] = None
task_state["is_processing"] = False
task_state["current_task_ref"] = None
if task_state["error_flag"] and temp_dir and os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
task_logger.info(f"因任务失败,已清理临时目录: {temp_dir}")
task_state["temp_dir"] = None
task_logger.info(f"后台翻译任务 '{original_filename}' 处理结束。")
task_logger.removeHandler(task_handler)
@@ -676,13 +783,18 @@ async def _start_translation_task(
if task_state["is_processing"] and task_state["current_task_ref"] and not task_state["current_task_ref"].done():
raise HTTPException(status_code=429, detail=f"任务ID '{task_id}' 正在进行中,请稍后再试。")
task_state["is_processing"] = True
# 如果存在旧的临时文件,先清理
if task_state.get("temp_dir") and os.path.isdir(task_state["temp_dir"]):
shutil.rmtree(task_state["temp_dir"])
task_state.update({
"is_processing": True,
"status_message": "任务初始化中...", "error_flag": False, "download_ready": False,
"workflow_instance": None,
"original_filename_stem": Path(original_filename).stem,
"original_filename": original_filename,
"task_start_time": time.time(), "task_end_time": 0, "current_task_ref": None,
"temp_dir": None, "downloadable_files": {},
})
log_history = tasks_log_histories[task_id]
@@ -791,7 +903,7 @@ async def service_cancel_translate(task_id: str):
@service_router.post(
"/release/{task_id}",
summary="释放任务资源",
description="""根据任务ID释放其在服务器上占用的所有资源包括状态、日志和缓存的翻译结果。如果任务正在进行会先尝试取消该任务。此操作不可逆。"""
description="""根据任务ID释放其在服务器上占用的所有资源包括状态、日志和缓存的翻译结果文件。如果任务正在进行,会先尝试取消该任务。此操作不可逆。"""
)
async def service_release_task(task_id: str):
if task_id not in tasks_state:
@@ -806,6 +918,18 @@ async def service_release_task(task_id: str):
except HTTPException as e:
print(f"[{task_id}] 取消任务时出现预期中的情况(可能已完成): {e.detail}")
message_parts.append(f"任务取消步骤已跳过(可能已完成或取消)。")
if task_state:
temp_dir = task_state.get("temp_dir")
if temp_dir and os.path.isdir(temp_dir):
try:
shutil.rmtree(temp_dir)
message_parts.append("临时文件已清理。")
print(f"[{task_id}] 临时目录 '{temp_dir}' 已被删除。")
except Exception as e:
message_parts.append(f"清理临时文件时出错: {e}")
print(f"[{task_id}] 删除临时目录 '{temp_dir}' 时出错: {e}")
tasks_state.pop(task_id, None)
tasks_log_queues.pop(task_id, None)
tasks_log_histories.pop(task_id, None)
@@ -935,27 +1059,9 @@ async def service_get_status(
raise HTTPException(status_code=404, detail=f"找不到任务ID '{task_id}'")
downloads = {}
if task_state.get("download_ready") and task_state.get("workflow_instance"):
workflow = task_state["workflow_instance"]
if isinstance(workflow, HTMLExportable):
downloads["html"] = f"/service/download/{task_id}/html"
if isinstance(workflow, MDFormatsExportable):
downloads["markdown"] = f"/service/download/{task_id}/markdown"
downloads["markdown_zip"] = f"/service/download/{task_id}/markdown_zip"
if isinstance(workflow, TXTExportable):
downloads["txt"] = f"/service/download/{task_id}/txt"
if isinstance(workflow, JsonExportable):
downloads["json"] = f"/service/download/{task_id}/json"
if isinstance(workflow, XlsxExportable):
downloads["xlsx"] = f"/service/download/{task_id}/xlsx"
if isinstance(workflow, CsvExportable):
downloads["csv"] = f"/service/download/{task_id}/csv"
if isinstance(workflow, DocxExportable):
downloads["docx"] = f"/service/download/{task_id}/docx"
if isinstance(workflow, SrtExportable):
downloads["srt"] = f"/service/download/{task_id}/srt"
if isinstance(workflow, EpubExportable):
downloads["epub"] = f"/service/download/{task_id}/epub"
if task_state.get("download_ready") and task_state.get("downloadable_files"):
for file_type in task_state["downloadable_files"].keys():
downloads[file_type] = f"/service/download/{task_id}/{file_type}"
return JSONResponse(content={
"task_id": task_id,
@@ -993,92 +1099,6 @@ async def service_get_logs(task_id: str):
FileType = Literal["markdown", "markdown_zip", "html", "txt", "json", "xlsx", "csv", "docx", "srt", "epub"]
async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple[bytes, str, str]:
"""辅助函数,从 workflow 获取内容、媒体类型和文件名"""
task_state = tasks_state.get(task_id)
if not task_state:
raise HTTPException(status_code=404, detail=f"找不到任务ID '{task_id}'")
if not task_state.get("download_ready") or not task_state.get("workflow_instance"):
raise HTTPException(status_code=404, detail="内容尚未准备好。")
workflow: Workflow = task_state["workflow_instance"]
filename_stem = task_state['original_filename_stem']
try:
content_bytes: bytes
media_type: str
filename: str
html_config = None
if file_type == 'html':
is_cdn_available = True
try:
await httpx_client.head("https://s4.zstatic.net/ajax/libs/KaTeX/0.16.9/contrib/auto-render.min.js",
timeout=3)
except (httpx.TimeoutException, httpx.RequestError):
is_cdn_available = False
workflow.config.logger.warning("CDN连接失败将使用本地JS进行渲染。")
if isinstance(workflow, MarkdownBasedWorkflow):
html_config = MD2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, TXTWorkflow):
html_config = TXT2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, JsonWorkflow):
html_config = Json2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, XlsxWorkflow):
html_config = Xlsx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, DocxWorkflow):
html_config = Docx2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, SrtWorkflow):
html_config = Srt2HTMLExporterConfig(cdn=is_cdn_available)
elif isinstance(workflow, EpubWorkflow):
html_config = Epub2HTMLExporterConfig(cdn=is_cdn_available)
# No special html_config for HtmlWorkflow as it doesn't use these preview-oriented features
if file_type == 'html' and isinstance(workflow, HTMLExportable):
content_str = await asyncio.to_thread(workflow.export_to_html, html_config)
content_bytes, media_type, filename = content_str.encode(
'utf-8'), "text/html; charset=utf-8", f"{filename_stem}_translated.html"
elif file_type == 'markdown' and isinstance(workflow, MDFormatsExportable):
md_content = workflow.export_to_markdown()
content_bytes, media_type, filename = md_content.encode(
'utf-8'), "text/markdown; charset=utf-8", f"{filename_stem}_translated.md"
elif file_type == 'markdown_zip' and isinstance(workflow, MDFormatsExportable):
content_bytes, media_type, filename = await asyncio.to_thread(
workflow.export_to_markdown_zip), "application/zip", f"{filename_stem}_translated.zip"
elif file_type == 'txt' and isinstance(workflow, TXTExportable):
txt_content = await asyncio.to_thread(workflow.export_to_txt)
content_bytes, media_type, filename = txt_content.encode(
'utf-8'), "text/plain; charset=utf-8", f"{filename_stem}_translated.txt"
elif file_type == 'json' and isinstance(workflow, JsonExportable):
json_content = await asyncio.to_thread(workflow.export_to_json)
content_bytes, media_type, filename = json_content.encode(
'utf-8'), "application/json; charset=utf-8", f"{filename_stem}_translated.json"
elif file_type == 'xlsx' and isinstance(workflow, XlsxExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_xlsx)
media_type, filename = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", f"{filename_stem}_translated.xlsx"
elif file_type == 'csv' and isinstance(workflow, CsvExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_csv)
media_type, filename = "text/csv; charset=utf-8", f"{filename_stem}_translated.csv"
elif file_type == 'docx' and isinstance(workflow, DocxExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_docx)
media_type, filename = "application/vnd.openxmlformats-officedocument.wordprocessingml.document", f"{filename_stem}_translated.docx"
elif file_type == 'srt' and isinstance(workflow, SrtExportable):
content_str = await asyncio.to_thread(workflow.export_to_srt)
content_bytes, media_type, filename = content_str.encode(
'utf-8'), "text/plain; charset=utf-8", f"{filename_stem}_translated.srt"
elif file_type == 'epub' and isinstance(workflow, EpubExportable):
content_bytes = await asyncio.to_thread(workflow.export_to_epub)
media_type, filename = "application/epub+zip", f"{filename_stem}_translated.epub"
else:
raise HTTPException(status_code=404, detail=f"此任务不支持导出 '{file_type}' 类型的文件。")
return content_bytes, media_type, filename
except Exception as e:
workflow.config.logger.error(f"导出 {file_type} 时出错: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"导出 {file_type} 时发生内部错误: {e}")
@service_router.get(
"/download/{task_id}/{file_type}",
summary="下载翻译结果文件",
@@ -1100,8 +1120,8 @@ async def _get_content_from_workflow(task_id: str, file_type: FileType) -> tuple
"schema": {"type": "string", "format": "binary"}},
}
},
404: {"description": "任务ID不存在或该任务不支持所请求的文件类型。"},
500: {"description": "在服务器上生成文件时发生内部错误。"}
404: {"description": "任务ID不存在或该任务不支持所请求的文件类型,或临时文件已丢失"},
500: {"description": "在服务器上读取文件时发生内部错误。"}
}
)
async def service_download_file(
@@ -1109,9 +1129,20 @@ async def service_download_file(
file_type: FileType = FastApiPath(..., description="要下载的文件类型。",
examples=["html", "json", "csv", "docx", "srt", "epub"])
):
content, media_type, filename = await _get_content_from_workflow(task_id, file_type)
headers = {"Content-Disposition": f"attachment; filename*=UTF-8''{quote(filename, safe='', encoding='utf-8')}"}
return StreamingResponse(io.BytesIO(content), media_type=media_type, headers=headers)
task_state = tasks_state.get(task_id)
if not task_state:
raise HTTPException(status_code=404, detail=f"找不到任务ID '{task_id}'")
file_info = task_state.get("downloadable_files", {}).get(file_type)
if not file_info or not os.path.exists(file_info.get("path")):
raise HTTPException(status_code=404,
detail=f"任务 '{task_id}' 不支持下载 '{file_type}' 类型的文件,或文件已丢失。")
file_path = file_info["path"]
filename = file_info["filename"]
media_type = MEDIA_TYPES.get(file_type, "application/octet-stream")
return FileResponse(path=file_path, media_type=media_type, filename=filename)
@service_router.get(
@@ -1153,8 +1184,8 @@ async def service_download_file(
}
}}}
},
404: {"description": "任务ID不存在或该任务不支持所请求的文件类型。"},
500: {"description": "在服务器上生成文件时发生内部错误。"}
404: {"description": "任务ID不存在或该任务不支持所请求的文件类型,或临时文件已丢失"},
500: {"description": "在服务器上读取文件时发生内部错误。"}
}
)
async def service_content(
@@ -1162,15 +1193,28 @@ async def service_content(
file_type: FileType = FastApiPath(..., description="要获取内容的文件类型。",
examples=["html", "json", "csv", "docx", "srt", "epub"])
):
content, _, filename = await _get_content_from_workflow(task_id, file_type)
task_state = tasks_state.get(task_id)
if not task_state:
raise HTTPException(status_code=404, detail=f"找不到任务ID '{task_id}'")
final_content = base64.b64encode(content).decode('utf-8')
file_info = task_state.get("downloadable_files", {}).get(file_type)
if not file_info or not os.path.exists(file_info.get("path")):
raise HTTPException(status_code=404, detail=f"任务 '{task_id}' 不支持获取 '{file_type}' 类型的内容,或文件已丢失。")
return JSONResponse(content={
"file_type": file_type,
"filename": filename,
"content": final_content
})
file_path = file_info["path"]
filename = file_info["filename"]
try:
with open(file_path, "rb") as f:
content_bytes = f.read()
final_content = base64.b64encode(content_bytes).decode('utf-8')
return JSONResponse(content={
"file_type": file_type,
"filename": filename,
"content": final_content
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取文件时发生内部错误: {e}")
# ===================================================================