fix: MT模式改用JSON批处理+逐条回退,兼顾速度和可靠性

- MT模式: JSON批处理(616段→~12 chunks)
- 自动检测未翻译段(原文==译文),逐条回退重译
- 比纯逐条快约10倍,比纯批处理可靠

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-08 16:44:34 +08:00
parent 4893163221
commit 2d39c009cf

View File

@@ -182,6 +182,30 @@ class SegmentsTranslateAgent(Agent):
logger.error(f"原始prompt也不是有效的json格式: {original_segments}") logger.error(f"原始prompt也不是有效的json格式: {original_segments}")
return {"error": f"{original_segments}"} return {"error": f"{original_segments}"}
def _mt_json_result_handler(self, result: str, origin_prompt: str, logger: Logger) -> dict:
"""MT JSON batch: 解析JSON响应返回{key: translation}。"""
try:
original_chunk = json_repair.loads(origin_prompt)
repaired = json_repair.loads(result)
except (RuntimeError, JSONDecodeError):
raise AgentResultError("MT JSON parse failed")
if not isinstance(repaired, dict):
raise AgentResultError(f"MT JSON result not dict: {type(repaired)}")
out = {}
for key in original_chunk:
out[key] = str(repaired.get(key, original_chunk[key]))
return out
def _mt_json_error_handler(self, origin_prompt: str, logger: Logger) -> dict:
"""MT JSON batch error: 返回原文。"""
try:
original_chunk = json_repair.loads(origin_prompt)
return {k: str(v) for k, v in original_chunk.items()}
except Exception:
return {"0": origin_prompt}
def _mt_individual_result_handler(self, result: str, origin_prompt: str, logger: Logger) -> str: def _mt_individual_result_handler(self, result: str, origin_prompt: str, logger: Logger) -> str:
"""MT individual: 直接返回翻译结果。""" """MT individual: 直接返回翻译结果。"""
return result.strip() return result.strip()
@@ -235,16 +259,56 @@ class SegmentsTranslateAgent(Agent):
def send_segments(self, segments: list[str], chunk_size: int) -> list[str]: def send_segments(self, segments: list[str], chunk_size: int) -> list[str]:
if self.is_mt_mode: if self.is_mt_mode:
# MT models (qwen-mt-*) destroy ALL text separators — batching is impossible.
# Each segment must be sent individually for 100% reliable 1:1 mapping.
if not segments: if not segments:
return [] return []
return super().send_prompts( # JSON batching: 616 segments → ~12 JSON chunks
prompts=segments, indexed_originals, chunks, merged_indices_list = segments2json_chunks(segments, chunk_size)
result_handler=self._mt_individual_result_handler, prompts = [json.dumps(chunk, ensure_ascii=False) for chunk in chunks]
error_result_handler=self._mt_individual_error_handler, translated_chunks = super().send_prompts(
prompts=prompts,
result_handler=self._mt_json_result_handler,
error_result_handler=self._mt_json_error_handler,
) )
# Detect unchanged segments (MT model returned original text)
indexed_translated = indexed_originals.copy()
failed_indices = []
for chunk_result, chunk_original in zip(translated_chunks, chunks):
if not isinstance(chunk_result, dict):
for k in chunk_original:
failed_indices.append(int(k))
continue
for key in chunk_original:
val = chunk_result.get(key, "")
if isinstance(val, str) and val.strip() == str(chunk_original[key]).strip():
failed_indices.append(int(key))
indexed_translated[key] = str(val)
# Retry failed segments individually
if failed_indices:
self.logger.info(
f"MT JSON batch: {len(failed_indices)}/{len(segments)} segments unchanged, retrying individually"
)
retry_segments = [segments[i] for i in failed_indices]
retry_results = super().send_prompts(
prompts=retry_segments,
result_handler=self._mt_individual_result_handler,
error_result_handler=self._mt_individual_error_handler,
)
for idx, trans in zip(failed_indices, retry_results):
indexed_translated[str(idx)] = trans
# Reconstruct result list
result = []
last_end = 0
ls = list(indexed_translated.values())
for start, end in merged_indices_list:
result.extend(ls[last_end:start])
result.append("".join(map(str, ls[start:end])))
last_end = end
result.extend(ls[last_end:])
return result
# Non-MT mode: JSON batch translation # Non-MT mode: JSON batch translation
indexed_originals, chunks, merged_indices_list = segments2json_chunks(segments, chunk_size) indexed_originals, chunks, merged_indices_list = segments2json_chunks(segments, chunk_size)
prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks] prompts = [generate_prompt(json.dumps(chunk, ensure_ascii=False, indent=0), self.to_lang) for chunk in chunks]
@@ -283,14 +347,57 @@ class SegmentsTranslateAgent(Agent):
async def send_segments_async(self, segments: list[str], chunk_size: int) -> list[str]: async def send_segments_async(self, segments: list[str], chunk_size: int) -> list[str]:
if self.is_mt_mode: if self.is_mt_mode:
# MT models destroy ALL text separators — each segment must be sent individually.
if not segments: if not segments:
return [] return []
return await super().send_prompts_async( # JSON batching: 616 segments → ~12 JSON chunks
prompts=segments, indexed_originals, chunks, merged_indices_list = await asyncio.to_thread(
result_handler=self._mt_individual_result_handler, segments2json_chunks, segments, chunk_size
error_result_handler=self._mt_individual_error_handler,
) )
prompts = [json.dumps(chunk, ensure_ascii=False) for chunk in chunks]
translated_chunks = await super().send_prompts_async(
prompts=prompts,
result_handler=self._mt_json_result_handler,
error_result_handler=self._mt_json_error_handler,
)
# Detect unchanged segments
indexed_translated = indexed_originals.copy()
failed_indices = []
for chunk_result, chunk_original in zip(translated_chunks, chunks):
if not isinstance(chunk_result, dict):
for k in chunk_original:
failed_indices.append(int(k))
continue
for key in chunk_original:
val = chunk_result.get(key, "")
if isinstance(val, str) and val.strip() == str(chunk_original[key]).strip():
failed_indices.append(int(key))
indexed_translated[key] = str(val)
# Retry failed segments individually
if failed_indices:
self.logger.info(
f"MT JSON batch: {len(failed_indices)}/{len(segments)} segments unchanged, retrying individually"
)
retry_segments = [segments[i] for i in failed_indices]
retry_results = await super().send_prompts_async(
prompts=retry_segments,
result_handler=self._mt_individual_result_handler,
error_result_handler=self._mt_individual_error_handler,
)
for idx, trans in zip(failed_indices, retry_results):
indexed_translated[str(idx)] = trans
# Reconstruct result list
result = []
last_end = 0
ls = list(indexed_translated.values())
for start, end in merged_indices_list:
result.extend(ls[last_end:start])
result.append("".join(map(str, ls[start:end])))
last_end = end
result.extend(ls[last_end:])
return result
# Non-MT mode: JSON batch translation # Non-MT mode: JSON batch translation
indexed_originals, chunks, merged_indices_list = await asyncio.to_thread(segments2json_chunks, segments, indexed_originals, chunks, merged_indices_list = await asyncio.to_thread(segments2json_chunks, segments,