txt翻译支持按连续文本分段

This commit is contained in:
xunbu
2025-11-09 10:35:59 +08:00
parent 17d1e8476b
commit 11ff6aa58c
4 changed files with 336 additions and 117 deletions

View File

@@ -207,7 +207,7 @@ async def lifespan(app: FastAPI):
global_logger.propagate = False global_logger.propagate = False
global_logger.setLevel(logging.INFO) global_logger.setLevel(logging.INFO)
print("应用启动完成,多任务状态已初始化。") print("应用启动完成,多任务状态已初始化。")
print(f"服务接口文档: http://127.0.0.1:{app.state.port_to_use}/docs") print(f"服务接口文档: http://1227.0.0.1:{app.state.port_to_use}/docs")
print(f"请用浏览器访问 http://127.0.0.1:{app.state.port_to_use}\n") print(f"请用浏览器访问 http://127.0.0.1:{app.state.port_to_use}\n")
yield yield
# 清理任何可能残留的临时目录 # 清理任何可能残留的临时目录
@@ -469,6 +469,10 @@ class TextWorkflowParams(BaseWorkflowParams):
"\n", "\n",
description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。", description="当 insert_mode 为 'append''prepend' 时,用于分隔原文和译文的分隔符。",
) )
segment_mode: Literal["line", "paragraph"] = Field(
"line",
description="分段模式。'line':按行分段(每行独立翻译),'paragraph':按段落分段(连续非空行合并为段落)。",
)
class JsonWorkflowParams(BaseWorkflowParams): class JsonWorkflowParams(BaseWorkflowParams):
@@ -961,6 +965,7 @@ async def _perform_translation(
"glossary_dict", "glossary_dict",
"insert_mode", "insert_mode",
"separator", "separator",
"segment_mode",
"timeout", "timeout",
"retry", "retry",
"system_proxy_enable", "system_proxy_enable",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -24,15 +24,21 @@ class TXTTranslatorConfig(AiTranslatorConfig):
separator (str): separator (str):
"append""prepend" 模式下,用于分隔原文和译文的字符串。 "append""prepend" 模式下,用于分隔原文和译文的字符串。
默认为换行符 "\n" 默认为换行符 "\n"
segment_mode (Literal["line", "paragraph"]):
分段模式。
- "line": 按行分段(每行独立翻译)
- "paragraph": 按段落分段(连续非空行合并为段落)
默认为 "paragraph"
""" """
insert_mode: Literal["replace", "append", "prepend"] = "replace" insert_mode: Literal["replace", "append", "prepend"] = "replace"
separator: str = "\n" separator: str = "\n"
segment_mode: Literal["line", "paragraph"] = "line"
class TXTTranslator(AiTranslator): class TXTTranslator(AiTranslator):
""" """
一个用于翻译纯文本 (.txt) 文件的翻译器。 一个用于翻译纯文本 (.txt) 文件的翻译器。
它会按行读取文件内容,对每一行进行翻译,然后根据配置将译文写回 支持按行或按段落两种分段模式进行翻译
""" """
def __init__(self, config: TXTTranslatorConfig): def __init__(self, config: TXTTranslatorConfig):
@@ -40,7 +46,7 @@ class TXTTranslator(AiTranslator):
初始化 TXTTranslator。 初始化 TXTTranslator。
Args: Args:
config (TxtTranslatorConfig): 翻译器的配置。 config (TXTTranslatorConfig): 翻译器的配置。
""" """
super().__init__(config=config) super().__init__(config=config)
self.chunk_size = config.chunk_size self.chunk_size = config.chunk_size
@@ -64,16 +70,17 @@ class TXTTranslator(AiTranslator):
self.translate_agent = SegmentsTranslateAgent(agent_config) self.translate_agent = SegmentsTranslateAgent(agent_config)
self.insert_mode = config.insert_mode self.insert_mode = config.insert_mode
self.separator = config.separator self.separator = config.separator
self.segment_mode = config.segment_mode
def _pre_translate(self, document: Document) -> List[str]: def _pre_translate(self, document: Document) -> List[str]:
""" """
预处理步骤解析TXT文件,按行分割文本 预处理步骤:根据分段模式解析TXT文件。
Args: Args:
document (Document): 待处理的文档对象。 document (Document): 待处理的文档对象。
Returns: Returns:
List[str]: 待翻译的原文文本列表。 List[str]: 分段后的文本列表。
""" """
try: try:
# 使用 utf-8-sig 解码以处理可能存在的BOM (Byte Order Mark) # 使用 utf-8-sig 解码以处理可能存在的BOM (Byte Order Mark)
@@ -82,49 +89,146 @@ class TXTTranslator(AiTranslator):
self.logger.error(f"无法解码TXT文件内容请确保文件编码为UTF-8: {e}") self.logger.error(f"无法解码TXT文件内容请确保文件编码为UTF-8: {e}")
return [] return []
# 按行分割文本,并保留空行,因为它们可能是格式的一部分 if self.segment_mode == "line":
original_texts = txt_content.splitlines() return self._segment_by_line(txt_content)
else: # paragraph mode
return self._segment_by_paragraph(txt_content)
return original_texts def _segment_by_line(self, txt_content: str) -> List[str]:
"""
按行分段模式:每行作为独立分段。
Args:
txt_content (str): 文本内容
Returns:
List[str]: 按行分段的文本列表
"""
# 简单按行分割,保留所有行(包括空行)
return txt_content.splitlines()
def _segment_by_paragraph(self, txt_content: str) -> List[str]:
"""
按段落分段模式:连续非空行合并为段落,空行单独处理。
Args:
txt_content (str): 文本内容
Returns:
List[str]: 按段落分段的文本列表
"""
lines = txt_content.splitlines()
segments = [] # 每个元素要么是文本段落,要么是空行标记
i = 0
while i < len(lines):
if lines[i].strip(): # 非空行 → 文本段落
# 收集连续的非空行
paragraph_lines = []
while i < len(lines) and lines[i].strip():
paragraph_lines.append(lines[i])
i += 1
segments.append("\n".join(paragraph_lines))
else: # 空行 → 空行标记
# 收集连续的空行
empty_lines = []
while i < len(lines) and not lines[i].strip():
empty_lines.append(lines[i])
i += 1
# 用特殊标记表示空行组(保持数量信息)
segments.append(f"@@EMPTY_LINES_{len(empty_lines)}@@")
return segments
def _after_translate(self, translated_texts: List[str], original_texts: List[str]) -> bytes: def _after_translate(self, translated_texts: List[str], original_texts: List[str]) -> bytes:
""" """
翻译后处理步骤:将译文根据配置模式与原文合并并生成新的TXT文件内容 翻译后处理步骤:根据分段模式重建文档
Args: Args:
translated_texts (List[str]): 翻译后的文本列表。 translated_texts (List[str]): 翻译后的文本列表。
original_texts (List[str]): 原始文本列表。 original_texts (List[str]): 原始文本列表。
Returns: Returns:
bytes: 新的TXT文件内容的字节流。 bytes: 新的TXT文件内容的字节流。
""" """
if self.segment_mode == "line":
return self._reconstruct_by_line(translated_texts, original_texts)
else: # paragraph mode
return self._reconstruct_by_paragraph(translated_texts, original_texts)
def _reconstruct_by_line(self, translated_texts: List[str], original_lines: List[str]) -> bytes:
"""
按行模式重建文档。
Args:
translated_texts (List[str]): 翻译后的行列表
original_lines (List[str]): 原始行列表
Returns:
bytes: 重建的文档内容
"""
processed_lines = [] processed_lines = []
for i, original_text in enumerate(original_texts): for i, original_line in enumerate(original_lines):
# 如果原文是空行或仅包含空白字符,则直接保留,不进行翻译处理 # 如果是空行,直接保留
if not original_text.strip(): if not original_line.strip():
processed_lines.append(original_text) processed_lines.append(original_line)
continue continue
translated_text = translated_texts[i] translated_line = translated_texts[i]
# 根据插入模式更新内容 # 根据插入模式更新内容
if self.insert_mode == "replace": if self.insert_mode == "replace":
processed_lines.append(translated_text) processed_lines.append(translated_line)
elif self.insert_mode == "append": elif self.insert_mode == "append":
# strip() 避免在原文和译文间产生多余的空白 processed_lines.append(original_line.strip() + self.separator + translated_line.strip())
processed_lines.append(original_text.strip() + self.separator + translated_text.strip())
elif self.insert_mode == "prepend": elif self.insert_mode == "prepend":
processed_lines.append(translated_text.strip() + self.separator + original_text.strip()) processed_lines.append(translated_line.strip() + self.separator + original_line.strip())
else: else:
self.logger.error(f"不正确的TxtTranslatorConfig参数: insert_mode='{self.insert_mode}'") self.logger.error(f"不正确的insert_mode参数: '{self.insert_mode}'")
# 默认回退到替换模式,避免程序中断 processed_lines.append(translated_line)
processed_lines.append(translated_text)
# 将所有处理后的行重新合成为一个字符串,以换行符分隔 return "\n".join(processed_lines).encode('utf-8')
new_txt_content_str = "\n".join(processed_lines)
# 返回UTF-8编码的字节流 def _reconstruct_by_paragraph(self, translated_texts: List[str], original_segments: List[str]) -> bytes:
return new_txt_content_str.encode('utf-8') """
按段落模式重建文档。
Args:
translated_texts (List[str]): 翻译后的段落列表
original_segments (List[str]): 原始分段列表
Returns:
bytes: 重建的文档内容
"""
result_lines = []
translated_index = 0
for segment in original_segments:
# 处理空行组
if segment.startswith("@@EMPTY_LINES_"):
empty_count = int(segment.split('_')[-2]) # 提取空行数量
result_lines.extend([""] * empty_count)
continue
# 处理文本段落
if translated_index < len(translated_texts):
translated_text = translated_texts[translated_index]
translated_index += 1
# 根据插入模式处理
if self.insert_mode == "replace":
result_lines.append(translated_text)
elif self.insert_mode == "append":
result_lines.append(segment + self.separator + translated_text)
elif self.insert_mode == "prepend":
result_lines.append(translated_text + self.separator + segment)
else:
result_lines.append(translated_text)
else:
# 理论上不会发生,但安全处理
result_lines.append(segment)
return "\n".join(result_lines).encode('utf-8')
def translate(self, document: Document) -> Self: def translate(self, document: Document) -> Self:
""" """
@@ -136,14 +240,17 @@ class TXTTranslator(AiTranslator):
Returns: Returns:
Self: 返回翻译器实例,以支持链式调用。 Self: 返回翻译器实例,以支持链式调用。
""" """
original_texts = self._pre_translate(document) original_segments = self._pre_translate(document)
if not original_texts: if not original_segments:
self.logger.info("\n文件中没有找到需要翻译的文本内容。") self.logger.info("\n文件中没有找到需要翻译的文本内容。")
return self return self
# 过滤掉仅包含空白字符的行避免不必要的翻译API调用 # 过滤出需要翻译的文本段(非空行标记)
texts_to_translate = [text for text in original_texts if text.strip()] if self.segment_mode == "line":
texts_to_translate = [text for text in original_segments if text.strip()]
else: # paragraph mode
texts_to_translate = [text for text in original_segments if not text.startswith("@@EMPTY_LINES_")]
# --- 步骤 1: (可选) 术语提取 --- # --- 步骤 1: (可选) 术语提取 ---
if self.glossary_agent and texts_to_translate: if self.glossary_agent and texts_to_translate:
@@ -157,11 +264,26 @@ class TXTTranslator(AiTranslator):
translated_segments = self.translate_agent.send_segments(texts_to_translate, self.chunk_size) translated_segments = self.translate_agent.send_segments(texts_to_translate, self.chunk_size)
translated_texts_map = dict(zip(texts_to_translate, translated_segments)) translated_texts_map = dict(zip(texts_to_translate, translated_segments))
# 将翻译结果映射回原始行列表,非翻译行保持不变 # 将翻译结果映射回原始分段列表
final_translated_texts = [translated_texts_map.get(text, text) for text in original_texts] final_translated_texts = []
for segment in original_segments:
if self.segment_mode == "line":
# 行模式:空行保留,非空行翻译
if segment.strip() and segment in translated_texts_map:
final_translated_texts.append(translated_texts_map[segment])
else:
final_translated_texts.append(segment)
else:
# 段落模式:空行标记保留,文本段落翻译
if segment.startswith("@@EMPTY_LINES_"):
final_translated_texts.append(segment) # 空行标记原样保留
elif segment in translated_texts_map:
final_translated_texts.append(translated_texts_map[segment])
else:
final_translated_texts.append(segment)
# --- 步骤 3: 后处理并更新文档内容 --- # --- 步骤 3: 后处理并更新文档内容 ---
document.content = self._after_translate(final_translated_texts, original_texts) document.content = self._after_translate(final_translated_texts, original_segments)
return self return self
async def translate_async(self, document: Document) -> Self: async def translate_async(self, document: Document) -> Self:
@@ -175,14 +297,17 @@ class TXTTranslator(AiTranslator):
Self: 返回翻译器实例,以支持链式调用。 Self: 返回翻译器实例,以支持链式调用。
""" """
# I/O密集型操作在线程中运行 # I/O密集型操作在线程中运行
original_texts = await asyncio.to_thread(self._pre_translate, document) original_segments = await asyncio.to_thread(self._pre_translate, document)
if not original_texts: if not original_segments:
self.logger.info("\n文件中没有找到需要翻译的文本内容。") self.logger.info("\n文件中没有找到需要翻译的文本内容。")
return self return self
# 过滤掉仅包含空白字符的行 # 过滤出需要翻译的文本段
texts_to_translate = [text for text in original_texts if text.strip()] if self.segment_mode == "line":
texts_to_translate = [text for text in original_segments if text.strip()]
else: # paragraph mode
texts_to_translate = [text for text in original_segments if not text.startswith("@@EMPTY_LINES_")]
# --- 步骤 1: (可选) 术语提取 (异步) --- # --- 步骤 1: (可选) 术语提取 (异步) ---
if self.glossary_agent and texts_to_translate: if self.glossary_agent and texts_to_translate:
@@ -196,11 +321,24 @@ class TXTTranslator(AiTranslator):
translated_segments = await self.translate_agent.send_segments_async(texts_to_translate, self.chunk_size) translated_segments = await self.translate_agent.send_segments_async(texts_to_translate, self.chunk_size)
translated_texts_map = dict(zip(texts_to_translate, translated_segments)) translated_texts_map = dict(zip(texts_to_translate, translated_segments))
# 将翻译结果映射回原始列表 # 将翻译结果映射回原始分段列表
final_translated_texts = [translated_texts_map.get(text, text) for text in original_texts] final_translated_texts = []
for segment in original_segments:
if self.segment_mode == "line":
if segment.strip() and segment in translated_texts_map:
final_translated_texts.append(translated_texts_map[segment])
else:
final_translated_texts.append(segment)
else:
if segment.startswith("@@EMPTY_LINES_"):
final_translated_texts.append(segment)
elif segment in translated_texts_map:
final_translated_texts.append(translated_texts_map[segment])
else:
final_translated_texts.append(segment)
# --- 步骤 3: 后处理并更新文档内容 (I/O密集型) --- # --- 步骤 3: 后处理并更新文档内容 (I/O密集型) ---
document.content = await asyncio.to_thread( document.content = await asyncio.to_thread(
self._after_translate, final_translated_texts, original_texts self._after_translate, final_translated_texts, original_segments
) )
return self return self