fix: MT模式改为逐条翻译,彻底消除标记依赖和29.4%内容丢失

- MT模式不再打包segment为<<<SEG:n>>>标记格式
- 改为每个segment一次纯文本API调用,1:1映射,零标记零解析
- 删除_chunk_to_mt_prompt/_parse_mt_response/_result_handler_mt等~100行死代码
- 新增_mt_simple_result_handler/_mt_simple_error_handler
- 非MT模式(JSON批处理)不受影响

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-08 15:20:06 +08:00
parent 52bb8858c8
commit 4cf1a8c67d

View File

@@ -15,10 +15,6 @@ from docutranslate.agents.agent import PartialAgentResultError, AgentResultError
from docutranslate.glossary.glossary import Glossary from docutranslate.glossary.glossary import Glossary
from docutranslate.utils.json_utils import segments2json_chunks, fix_json_string from docutranslate.utils.json_utils import segments2json_chunks, fix_json_string
# MT mode plain-text segment marker — designed to survive machine translation unchanged
MT_SEG_MARKER_RE = re.compile(r'<<<SEG:(\d+)>>>\s*\n(.*?)(?=<<<SEG:\d+>>>|\Z)', re.DOTALL)
def generate_prompt(json_segments: str, to_lang: str): def generate_prompt(json_segments: str, to_lang: str):
return f""" return f"""
You will receive a sequence of original text segments to be translated, represented in JSON format. The keys are segment IDs, and the values are the text content to be translated. You will receive a sequence of original text segments to be translated, represented in JSON format. The keys are segment IDs, and the values are the text content to be translated.
@@ -95,44 +91,6 @@ def get_target_segments(result: str):
return result return result
def _chunk_to_mt_prompt(chunk: dict) -> str:
"""Convert a JSON chunk like {'0': 'text1', '1': 'text2'} to MT-friendly plain text."""
parts = []
for key in sorted(chunk.keys(), key=int):
parts.append(f"<<<SEG:{key}>>>\n{chunk[key]}")
return "\n".join(parts)
def _parse_mt_prompt_to_dict(mt_prompt: str) -> dict:
"""Parse an MT prompt string back to the original segment dict."""
result = {}
for match in MT_SEG_MARKER_RE.finditer(mt_prompt):
key = match.group(1)
value = match.group(2).strip()
result[key] = value
if not result:
# MT format parsing failed — wrap entire prompt as single segment
result = {"0": mt_prompt}
return result
def _parse_mt_response(text: str, original_chunk: dict) -> dict:
"""Parse MT plain-text response using <<<SEG:n>>> markers back to dict."""
result = {}
for match in MT_SEG_MARKER_RE.finditer(text):
key = match.group(1)
value = match.group(2).strip()
if key in original_chunk:
result[key] = value
# Fill missing keys from original
for key in original_chunk:
if key not in result:
result[key] = ""
return result
@dataclass(kw_only=True) @dataclass(kw_only=True)
class SegmentsTranslateAgentConfig(AgentConfig): class SegmentsTranslateAgentConfig(AgentConfig):
to_lang: str to_lang: str
@@ -162,14 +120,8 @@ class SegmentsTranslateAgent(Agent):
return system_prompt, prompt return system_prompt, prompt
def _result_handler(self, result: str, origin_prompt: str, logger: Logger): def _result_handler(self, result: str, origin_prompt: str, logger: Logger):
""" """处理非MT模式的JSON翻译响应。"""
处理成功的API响应。 # --- JSON-based ---
MT模式下使用 <<<SEG:n>>> 标记解析纯文本响应避免JSON格式不兼容问题。
"""
if self.is_mt_mode:
return self._result_handler_mt(result, origin_prompt, logger)
# --- Non-MT mode (JSON-based) ---
original_segments = get_original_segments(origin_prompt) original_segments = get_original_segments(origin_prompt)
result = get_target_segments(result) result = get_target_segments(result)
if result == "": if result == "":
@@ -216,77 +168,8 @@ class SegmentsTranslateAgent(Agent):
except (RuntimeError, JSONDecodeError) as e: except (RuntimeError, JSONDecodeError) as e:
raise AgentResultError(f"结果处理失败: {e.__repr__()}") raise AgentResultError(f"结果处理失败: {e.__repr__()}")
def _result_handler_mt(self, result: str, origin_prompt: str, logger: Logger) -> dict:
"""MT模式专用结果处理器解析 <<<SEG:n>>> 标记格式的纯文本响应。"""
result_clean = result.strip()
if result_clean == "":
if origin_prompt.strip() != "":
raise AgentResultError("result为空值但原文不为空")
return {}
original_chunk = _parse_mt_prompt_to_dict(origin_prompt)
original_keys = set(original_chunk.keys())
# Try parsing with <<<SEG:n>>> markers
parsed = _parse_mt_response(result_clean, original_chunk)
if parsed and any(v.strip() for v in parsed.values()):
result_keys = set(parsed.keys())
if result_keys == original_keys:
# Check if result is identical to original (no translation happened)
all_same = all(
parsed.get(k, "").strip() == str(original_chunk.get(k, "")).strip()
for k in original_keys
)
if all_same:
raise AgentResultError("翻译结果与原文完全相同,疑似翻译失败,将进行重试。")
return parsed
# If key mismatch, try as Partial result
if result_keys and result_keys != original_keys:
final_chunk = {}
for key in original_keys:
final_chunk[key] = parsed.get(key, str(original_chunk.get(key, "")))
raise PartialAgentResultError(
"MT模式键不匹配触发重试",
partial_result=final_chunk,
append_prompt="\nPreserve all <<<SEG:n>>> markers exactly as they appear.\n"
)
# Fallback: Try line-by-line mapping (MT model might have removed markers)
result_lines = [line.strip() for line in result_clean.split('\n') if line.strip()]
original_seg_list = [str(original_chunk.get(str(i), "")) for i in range(len(original_chunk))]
non_empty_lines = [l for l in result_lines if l]
if len(non_empty_lines) == len(original_chunk):
repaired = {str(i): non_empty_lines[i] for i in range(len(non_empty_lines))}
all_same = all(
repaired.get(k, "").strip() == str(original_chunk.get(k, "")).strip()
for k in original_keys
)
if all_same:
raise AgentResultError("翻译结果与原文完全相同(逐行),疑似翻译失败,将进行重试。")
return repaired
# Last fallback: assign all result text to first key
if non_empty_lines:
repaired = {str(i): "" for i in range(len(original_chunk))}
repaired["0"] = "\n".join(non_empty_lines)
return repaired
raise AgentResultError("MT模式无法解析响应")
def _error_result_handler(self, origin_prompt: str, logger: Logger): def _error_result_handler(self, origin_prompt: str, logger: Logger):
""" """非MT模式: 所有重试失败后返回原文。"""
处理在所有重试后仍然失败的请求。
作为备用方案,返回原文内容。
"""
if self.is_mt_mode:
original_chunk = _parse_mt_prompt_to_dict(origin_prompt)
for key in list(original_chunk.keys()):
original_chunk[key] = f"{original_chunk[key]}"
return original_chunk
original_segments = get_original_segments(origin_prompt) original_segments = get_original_segments(origin_prompt)
if original_segments == "": if original_segments == "":
return {} return {}
@@ -299,12 +182,26 @@ class SegmentsTranslateAgent(Agent):
logger.error(f"原始prompt也不是有效的json格式: {original_segments}") logger.error(f"原始prompt也不是有效的json格式: {original_segments}")
return {"error": f"{original_segments}"} return {"error": f"{original_segments}"}
def _mt_simple_result_handler(self, result: str, origin_prompt: str, logger: Logger) -> str:
"""MT mode: 直接返回翻译结果,不解析标记/JSON。"""
return result.strip()
def _mt_simple_error_handler(self, origin_prompt: str, logger: Logger) -> str:
"""MT mode error fallback: 返回原文。"""
return origin_prompt
def send_segments(self, segments: list[str], chunk_size: int) -> list[str]: def send_segments(self, segments: list[str], chunk_size: int) -> list[str]:
indexed_originals, chunks, merged_indices_list = segments2json_chunks(segments, chunk_size)
if self.is_mt_mode: if self.is_mt_mode:
prompts = [_chunk_to_mt_prompt(chunk) for chunk in chunks] # MT mode: send each segment individually as plain text, no markers, no batching
else: return super().send_prompts(
prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks] prompts=segments,
result_handler=self._mt_simple_result_handler,
error_result_handler=self._mt_simple_error_handler,
)
# Non-MT mode: JSON batch translation
indexed_originals, chunks, merged_indices_list = segments2json_chunks(segments, chunk_size)
prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks]
translated_chunks = super().send_prompts(prompts=prompts, json_format=self.force_json, translated_chunks = super().send_prompts(prompts=prompts, json_format=self.force_json,
pre_send_handler=self._pre_send_handler, pre_send_handler=self._pre_send_handler,
result_handler=self._result_handler, result_handler=self._result_handler,
@@ -339,13 +236,18 @@ class SegmentsTranslateAgent(Agent):
return result return result
async def send_segments_async(self, segments: list[str], chunk_size: int) -> list[str]: async def send_segments_async(self, segments: list[str], chunk_size: int) -> list[str]:
if self.is_mt_mode:
# MT mode: send each segment individually as plain text, no markers, no batching
return await super().send_prompts_async(
prompts=segments,
result_handler=self._mt_simple_result_handler,
error_result_handler=self._mt_simple_error_handler,
)
# Non-MT mode: JSON batch translation
indexed_originals, chunks, merged_indices_list = await asyncio.to_thread(segments2json_chunks, segments, indexed_originals, chunks, merged_indices_list = await asyncio.to_thread(segments2json_chunks, segments,
chunk_size) chunk_size)
if self.is_mt_mode: prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks]
prompts = [_chunk_to_mt_prompt(chunk) for chunk in chunks]
else:
prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks]
translated_chunks = await super().send_prompts_async(prompts=prompts, force_json=self.force_json, translated_chunks = await super().send_prompts_async(prompts=prompts, force_json=self.force_json,
pre_send_handler=self._pre_send_handler, pre_send_handler=self._pre_send_handler,
result_handler=self._result_handler, result_handler=self._result_handler,