txt翻译支持设置插入模式

This commit is contained in:
xunbu
2025-09-03 18:27:48 +08:00
parent cfbf7d6f13
commit a70700252e
7 changed files with 276 additions and 183 deletions

View File

@@ -1,3 +1,3 @@
# SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0
__version__="1.3.2"
__version__="1.3.3"

View File

@@ -25,26 +25,26 @@ class SegmentsTranslateAgent(Agent):
def __init__(self, config: SegmentsTranslateAgentConfig):
super().__init__(config)
self.system_prompt = f"""
Role
# Role
You are a professional machine translation engine.
Task
# Task
You will receive a sequence of segments to be translated, represented in JSON format. The keys are the segment IDs, and the values are the segments for translation.
You need to translate these segments into the target language.
Target language: {config.to_lang}
Requirements
# Requirements
The translation must be professional and accurate.
Do not output any explanations or annotations.
The format of the translated segments should be as close as possible to the source format.
For personal names and proper nouns, use the most commonly used words for translation. If there are multiple common translations, choose the word that comes first in dictionary order.
For special tags or other non-translatable elements (like codes, brand names, specific jargon), keep them in their original form.
If a segment is already in the target language, keep it as is.
Output
# Output
The translated sequence of segments, represented as JSON text (note: not a code block). The keys are the segment IDs, and the values are the translated segments.
The returned JSON text must be parsable by json.loads into a dictionary of the form {r'{"segment_id": "translation"}'}.
Example
Input
# Example
## Input
{r'{"0":"hello","1":"apple","2":true,"3":"false"}'}
Output
## Output
{r'{"0":"你好","1":"苹果","2":true,"3":"错误"}'}
Warning: Never wrap the entire JSON object in quotes to make it a single string. Never wrap the JSON text in ```.
"""

View File

@@ -1,59 +0,0 @@
# SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0
from dataclasses import dataclass
from docutranslate.agents import AgentConfig, Agent
from docutranslate.glossary.glossary import Glossary
@dataclass
class TXTTranslateAgentConfig(AgentConfig):
to_lang: str
custom_prompt: str | None = None
glossary_dict: dict[str, str] | None = None
class TXTTranslateAgent(Agent):
def __init__(self, config: TXTTranslateAgentConfig):
super().__init__(config)
self.system_prompt = f"""
# Role
You are a professional machine translation engine.
# Task
Translate the input txt text.
Target language: {config.to_lang}
# Requirements
- The translation must be professional and accurate.
- Do not output any explanations or annotations.
- Do not change placeholders in the format of `<ph-xxxxxx>`.
- For personal names and proper nouns, use the most commonly used words for translation. If there are multiple common translations, choose the word that comes first in dictionary order.
- For special tags or other non-translatable elements (like codes, brand names, specific jargon), keep them in their original form.
# Output
The translated txt text as plain text.
"""
self.custom_prompt = config.custom_prompt
if config.custom_prompt:
self.system_prompt += "\n# **Important rules or background** \n" + self.custom_prompt + '\n'
self.glossary_dict = config.glossary_dict
def _pre_send_handler(self, system_prompt, prompt):
if self.glossary_dict:
glossary = Glossary(glossary_dict=self.glossary_dict)
system_prompt += glossary.append_system_prompt(prompt)
return system_prompt, prompt
def send_chunks(self, prompts: list[str]):
return super().send_prompts(prompts=prompts, pre_send_handler=self._pre_send_handler)
async def send_chunks_async(self, prompts: list[str]):
return await super().send_prompts_async(prompts=prompts, pre_send_handler=self._pre_send_handler)
def update_glossary_dict(self, update_dict: dict | None):
if self.glossary_dict is None:
self.glossary_dict = {}
if update_dict is not None:
self.glossary_dict = update_dict | self.glossary_dict

View File

@@ -259,12 +259,6 @@ class BaseWorkflowParams(BaseModel):
glossary_agent_config: Optional[GlossaryAgentConfigPayload] = Field(None,
description="用于术语表生成的Agent的配置。如果 `glossary_generate_enable` 为 `True`,此项必填。")
@field_validator('glossary_agent_config')
def check_glossary_config(cls, v, values):
if values.data.get('glossary_generate_enable') and not v:
raise ValueError("当 `glossary_generate_enable` 为 `True` 时, `glossary_agent_config` 字段是必须的。")
return v
@model_validator(mode='before')
@classmethod
def check_translation_fields(cls, values):
@@ -279,6 +273,15 @@ class BaseWorkflowParams(BaseModel):
# 如果跳过翻译,则不进行任何检查,允许 base_url 等字段为空
return values
@model_validator(mode='after')
def check_glossary_config(self) -> 'BaseWorkflowParams':
"""
在所有字段验证后,检查术语表相关配置的逻辑一致性。
"""
if self.glossary_generate_enable and not self.glossary_agent_config:
raise ValueError("当 `glossary_generate_enable` 为 `True` 时, `glossary_agent_config` 字段是必须的。")
return self
# 2. 为每个工作流创建独立的参数模型
class MarkdownWorkflowParams(BaseWorkflowParams):
@@ -303,6 +306,14 @@ class MarkdownWorkflowParams(BaseWorkflowParams):
class TextWorkflowParams(BaseWorkflowParams):
workflow_type: Literal['txt'] = Field(..., description="指定使用纯文本的翻译工作流。")
insert_mode: Literal["replace", "append", "prepend"] = Field(
"replace",
description="翻译文本的插入模式。'replace':替换原文,'append':附加到原文后,'prepend':附加到原文前。"
)
separator: str = Field(
"\n",
description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。"
)
class JsonWorkflowParams(BaseWorkflowParams):
@@ -616,7 +627,8 @@ async def _perform_translation(
task_logger.info("构建 TXTWorkflow 配置。")
translator_args = payload.model_dump(include={
'skip_translate', 'base_url', 'api_key', 'model_id', 'to_lang', 'custom_prompt',
'temperature', 'thinking', 'chunk_size', 'concurrent', 'glossary_dict'
'temperature', 'thinking', 'chunk_size', 'concurrent', 'glossary_dict',
'insert_mode', 'separator'
}, exclude_none=True)
translator_args['glossary_generate_enable'] = payload.glossary_generate_enable
translator_args['glossary_agent_config'] = build_glossary_agent_config()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,26 +1,53 @@
# SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0
import asyncio
from dataclasses import dataclass
from typing import Self
from typing import Self, Literal, List
from docutranslate.agents.txt_agent import TXTTranslateAgent, TXTTranslateAgentConfig
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator
from docutranslate.utils.markdown_splitter import split_markdown_text
@dataclass
class TXTTranslatorConfig(AiTranslatorConfig):
...
"""
TXTTranslator的配置类。
Attributes:
insert_mode (Literal["replace", "append", "prepend"]):
指定如何插入翻译文本的模式。
- "replace": 用译文替换原文。
- "append": 将译文追加到原文后面。
- "prepend": 将译文前置到原文前面。
默认为 "replace"
separator (str):
"append""prepend" 模式下,用于分隔原文和译文的字符串。
默认为换行符 "\n"
"""
insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n"
class TXTTranslator(AiTranslator):
"""
一个用于翻译纯文本 (.txt) 文件的翻译器。
它会按行读取文件内容,对每一行进行翻译,然后根据配置将译文写回。
"""
def __init__(self, config: TXTTranslatorConfig):
"""
初始化 TXTTranslator。
Args:
config (TxtTranslatorConfig): 翻译器的配置。
"""
super().__init__(config=config)
self.chunk_size = config.chunk_size
self.translate_agent = None
if not self.skip_translate:
agent_config = TXTTranslateAgentConfig(custom_prompt=config.custom_prompt,
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
@@ -30,41 +57,148 @@ class TXTTranslator(AiTranslator):
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger,
glossary_dict=config.glossary_dict)
self.translate_agent = TXTTranslateAgent(agent_config)
glossary_dict=config.glossary_dict
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode
self.separator = config.separator
def _pre_translate(self, document: Document) -> List[str]:
"""
预处理步骤解析TXT文件按行分割文本。
Args:
document (Document): 待处理的文档对象。
Returns:
List[str]: 待翻译的原文文本行列表。
"""
try:
# 使用 utf-8-sig 解码以处理可能存在的BOM (Byte Order Mark)
txt_content = document.content.decode('utf-8-sig')
except (UnicodeDecodeError, AttributeError) as e:
self.logger.error(f"无法解码TXT文件内容请确保文件编码为UTF-8: {e}")
return []
# 按行分割文本,并保留空行,因为它们可能是格式的一部分
original_texts = txt_content.splitlines()
return original_texts
def _after_translate(self, translated_texts: List[str], original_texts: List[str]) -> bytes:
"""
翻译后处理步骤将译文根据配置模式与原文合并并生成新的TXT文件内容。
Args:
translated_texts (List[str]): 翻译后的文本行列表。
original_texts (List[str]): 原始文本行列表。
Returns:
bytes: 新的TXT文件内容的字节流。
"""
processed_lines = []
for i, original_text in enumerate(original_texts):
# 如果原文是空行或仅包含空白字符,则直接保留,不进行翻译处理
if not original_text.strip():
processed_lines.append(original_text)
continue
translated_text = translated_texts[i]
# 根据插入模式更新内容
if self.insert_mode == "replace":
processed_lines.append(translated_text)
elif self.insert_mode == "append":
# strip() 避免在原文和译文间产生多余的空白
processed_lines.append(original_text.strip() + self.separator + translated_text.strip())
elif self.insert_mode == "prepend":
processed_lines.append(translated_text.strip() + self.separator + original_text.strip())
else:
self.logger.error(f"不正确的TxtTranslatorConfig参数: insert_mode='{self.insert_mode}'")
# 默认回退到替换模式,避免程序中断
processed_lines.append(translated_text)
# 将所有处理后的行重新合成为一个字符串,以换行符分隔
new_txt_content_str = "\n".join(processed_lines)
# 返回UTF-8编码的字节流
return new_txt_content_str.encode('utf-8')
def translate(self, document: Document) -> Self:
self.logger.info("正在翻译txt")
chunks: list[str] = split_markdown_text(document.content.decode(), max_block_size=self.chunk_size)
if self.glossary_agent:
self.glossary_dict_gen = self.glossary_agent.send_segments(chunks, self.chunk_size)
"""
同步翻译TXT文档。
Args:
document (Document): 待翻译的文档对象。
Returns:
Self: 返回翻译器实例,以支持链式调用。
"""
original_texts = self._pre_translate(document)
if not original_texts:
self.logger.info("\n文件中没有找到需要翻译的文本内容。")
return self
# 过滤掉仅包含空白字符的行避免不必要的翻译API调用
texts_to_translate = [text for text in original_texts if text.strip()]
# --- 步骤 1: (可选) 术语提取 ---
if self.glossary_agent and texts_to_translate:
self.glossary_dict_gen = self.glossary_agent.send_segments(texts_to_translate, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
self.logger.info(f"txt分为{len(chunks)}")
if self.translate_agent:
result: list[str] = self.translate_agent.send_chunks(chunks)
else:
result=chunks
content = "\n".join(result)
document.content = content.encode()
self.logger.info("翻译完成")
# --- 步骤 2: 调用翻译Agent ---
translated_texts_map = {}
if self.translate_agent and texts_to_translate:
translated_segments = self.translate_agent.send_segments(texts_to_translate, self.chunk_size)
translated_texts_map = dict(zip(texts_to_translate, translated_segments))
# 将翻译结果映射回原始行列表,非翻译行保持不变
final_translated_texts = [translated_texts_map.get(text, text) for text in original_texts]
# --- 步骤 3: 后处理并更新文档内容 ---
document.content = self._after_translate(final_translated_texts, original_texts)
return self
async def translate_async(self, document: Document) -> Self:
self.logger.info("正在翻译txt")
chunks: list[str] = split_markdown_text(document.content.decode(), max_block_size=self.chunk_size)
"""
异步翻译TXT文档。
if self.glossary_agent:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(chunks, self.chunk_size)
Args:
document (Document): 待翻译的文档对象。
Returns:
Self: 返回翻译器实例,以支持链式调用。
"""
# I/O密集型操作在线程中运行
original_texts = await asyncio.to_thread(self._pre_translate, document)
if not original_texts:
self.logger.info("\n文件中没有找到需要翻译的文本内容。")
return self
# 过滤掉仅包含空白字符的行
texts_to_translate = [text for text in original_texts if text.strip()]
# --- 步骤 1: (可选) 术语提取 (异步) ---
if self.glossary_agent and texts_to_translate:
self.glossary_dict_gen = await self.glossary_agent.send_segments_async(texts_to_translate, self.chunk_size)
if self.translate_agent:
self.translate_agent.update_glossary_dict(self.glossary_dict_gen)
self.logger.info(f"txt分为{len(chunks)}")
if self.translate_agent:
result: list[str] = await self.translate_agent.send_chunks_async(chunks)
else:
result=chunks
content = "\n".join(result)
document.content = content.encode()
self.logger.info("翻译完成")
# --- 步骤 2: 调用翻译Agent (异步) ---
translated_texts_map = {}
if self.translate_agent and texts_to_translate:
translated_segments = await self.translate_agent.send_segments_async(texts_to_translate, self.chunk_size)
translated_texts_map = dict(zip(texts_to_translate, translated_segments))
# 将翻译结果映射回原始行列表
final_translated_texts = [translated_texts_map.get(text, text) for text in original_texts]
# --- 步骤 3: 后处理并更新文档内容 (I/O密集型) ---
document.content = await asyncio.to_thread(
self._after_translate, final_translated_texts, original_texts
)
return self