回退老的markdown翻译方式

This commit is contained in:
xunbu
2025-08-18 17:03:42 +08:00
parent 9c8a409c5d
commit 943c540edc
4 changed files with 67 additions and 129 deletions

View File

@@ -9,6 +9,7 @@ from urllib.parse import urlparse
import httpx
from docutranslate.global_values import USE_PROXY
from docutranslate.logger import global_logger
MAX_RETRY_COUNT = 2
@@ -88,8 +89,9 @@ class Agent:
self.temperature = config.temperature
# self.client = httpx.Client(trust_env=False, proxy=None, verify=False)
# self.client_async = httpx.AsyncClient(trust_env=False, proxy=None, verify=False)
self.client = httpx.Client(verify=False)
self.client_async = httpx.AsyncClient(verify=False)
self.client = httpx.Client(verify=False) if USE_PROXY else httpx.Client(proxy=None, verify=False)
self.client_async = httpx.AsyncClient(verify=False) if USE_PROXY else httpx.AsyncClient(proxy=None,
verify=False)
self.max_concurrent = config.max_concurrent
self.timeout = config.timeout
self.thinking = config.thinking

View File

@@ -7,6 +7,7 @@ from typing import Hashable
import httpx
from docutranslate.converter.x2md.base import X2MarkdownConverter, X2MarkdownConverterConfig
from docutranslate.global_values import USE_PROXY
from docutranslate.ir.document import Document
from docutranslate.ir.markdown_document import MarkdownDocument
from docutranslate.utils.markdown_utils import embed_inline_image_from_zip
@@ -29,11 +30,13 @@ timeout = httpx.Timeout(
write=200.0, # 写入超时 (发送数据的最长时间)
pool=1.0 # 从连接池获取连接的超时时间
)
if USE_PROXY:
client = httpx.Client(timeout=timeout, verify=False)
client_async = httpx.AsyncClient(timeout=timeout, verify=False)
else:
client = httpx.Client(trust_env=False, timeout=timeout, proxy=None, verify=False)
client_async = httpx.AsyncClient(trust_env=False, timeout=timeout, proxy=None, verify=False)
# client = httpx.Client(trust_env=False, timeout=timeout, proxy=None, verify=False)
# client_async = httpx.AsyncClient(trust_env=False, timeout=timeout, proxy=None, verify=False)
client = httpx.Client(timeout=timeout, verify=False)
client_async = httpx.AsyncClient(timeout=timeout, verify=False)
class ConverterMineru(X2MarkdownConverter):

View File

@@ -1,2 +1,8 @@
import os
from .conditional_import import available_packages,conditional_import
USE_PROXY=False
if os.getenv("DOCUTRANSLATE_USE_PROXY") and os.getenv("DOCUTRANSLATE_USE_PROXY").lower()=="true":
USE_PROXY=True

View File

@@ -1,32 +1,27 @@
import asyncio
from dataclasses import dataclass
from typing import Self, Literal, List, Dict, Any
from typing import Self
# 新增导入
from bs4 import BeautifulSoup, NavigableString
from markdown_it import MarkdownIt
from markdown_it.token import Token
from docutranslate.agents.segments_agent import SegmentsTranslateAgentConfig, SegmentsTranslateAgent
from docutranslate.ir.document import Document
from docutranslate.agents import MDTranslateAgent
from docutranslate.agents.markdown_agent import MDTranslateAgentConfig
from docutranslate.context.md_mask_context import MDMaskUrisContext
from docutranslate.ir.markdown_document import MarkdownDocument
from docutranslate.translator.ai_translator.base import AiTranslatorConfig
from docutranslate.translator.base import Translator
from docutranslate.utils.markdown_splitter import split_markdown_text, join_markdown_texts
from docutranslate.utils.markdown_utils import clean_markdown_math_block
@dataclass
class MDTranslatorConfig(AiTranslatorConfig):
insert_mode: Literal["replace"] = "replace"
translate_code_blocks: bool = False
...
class MDTranslator(Translator):
def __init__(self, config: MDTranslatorConfig):
super().__init__(config=config)
# ... (构造函数的其余部分保持不变)
self.chunk_size = config.chunk_size
agent_config = SegmentsTranslateAgentConfig(
custom_prompt=config.custom_prompt,
agent_config = MDTranslateAgentConfig(custom_prompt=config.custom_prompt,
to_lang=config.to_lang,
baseurl=config.base_url,
key=config.api_key,
@@ -36,108 +31,40 @@ class MDTranslator(Translator):
thinking=config.thinking,
max_concurrent=config.concurrent,
timeout=config.timeout,
logger=self.logger
)
self.translate_agent = SegmentsTranslateAgent(agent_config)
self.translate_agent.system_prompt+="对于参考文献,保持源语言不翻译"
self.config = config
# 启用 HTML 解析是安全的
self.md_parser = MarkdownIt("commonmark", {'html': True})
logger=self.logger)
self.translate_agent = MDTranslateAgent(agent_config)
def _pre_translate(self, document: Document) -> tuple[
List[Token], List[Dict[str, Any]], List[str], Dict[int, BeautifulSoup]]:
"""
预处理步骤:解析 Markdown 和嵌入的 HTML提取所有可翻译的纯文本。
"""
markdown_content = document.content.decode('utf-8')
tokens = self.md_parser.parse(markdown_content)
def translate(self, document: MarkdownDocument) -> Self:
self.logger.info("正在翻译markdown")
with MDMaskUrisContext(document):
chunks: list[str] = split_markdown_text(document.content.decode(), self.chunk_size)
self.logger.info(f"markdown分为{len(chunks)}")
result: list[str] = self.translate_agent.send_prompts(chunks)
content = join_markdown_texts(result)
# 做一些加强鲁棒性的操作
content = content.replace(r'\', r'\(')
content = content.replace(r'\', r'\)')
content = clean_markdown_math_block(content)
segments_to_translate = []
original_texts = []
# 新增:用于缓存已解析的 HTML DOM 树,避免重复解析
parsed_html_cache = {}
for i, token in enumerate(tokens):
# --- 分支 1: 处理标准 Markdown 内容 ---
if token.type == 'inline' and token.content:
for child_idx, child in enumerate(token.children):
if child.type == 'text' and child.content.strip():
segment_info = {
"type": "markdown", # 标记为 markdown 类型
"token_index": i,
"child_index": child_idx,
}
segments_to_translate.append(segment_info)
original_texts.append(child.content)
# --- 分支 2: 新增逻辑,处理嵌入的 HTML 块 ---
elif token.type == 'html_block' and token.content:
# 使用 BeautifulSoup 解析 HTML 内容
soup = BeautifulSoup(token.content, 'lxml')
parsed_html_cache[i] = soup # 缓存解析后的对象
# 查找所有文本节点 (NavigableString)
# 我们只翻译可见的、非空的文本内容
for text_node in soup.find_all(string=True):
if isinstance(text_node, NavigableString) and text_node.strip():
# 排除脚本和样式表内的文本
if text_node.parent.name in ['script', 'style']:
continue
segment_info = {
"type": "html", # 标记为 html 类型
"token_index": i, # 记录 html_block token 的索引
"node": text_node, # 直接引用文本节点对象
}
segments_to_translate.append(segment_info)
original_texts.append(text_node.string)
return tokens, segments_to_translate, original_texts, parsed_html_cache
def _after_translate(self, tokens: List[Token], segments_to_translate: List[Dict[str, Any]],
translated_texts: List[str], parsed_html_cache: Dict[int, BeautifulSoup]) -> bytes:
"""
翻译后处理步骤:将翻译后的文本分别写回 Markdown AST 和 HTML DOM然后重新渲染。
"""
for i, segment_info in enumerate(segments_to_translate):
translated_text = translated_texts[i]
token_index = segment_info["token_index"]
# --- 分支 1: 写回 Markdown Token ---
if segment_info["type"] == "markdown":
child_index = segment_info["child_index"]
tokens[token_index].children[child_index].content = translated_text
# --- 分支 2: 写回 HTML DOM (BeautifulSoup 对象) ---
elif segment_info["type"] == "html":
# 使用之前引用的节点对象,直接替换其内容
segment_info["node"].replace_with(translated_text)
# --- 新增步骤: 将修改后的 BeautifulSoup 对象渲染回字符串,更新 token ---
for token_index, soup in parsed_html_cache.items():
# 将 soup 对象转换回字符串prettify() 会进行格式化str() 则不会
tokens[token_index].content = str(soup)
renderer = self.md_parser.renderer
translated_markdown = renderer.render(tokens, self.md_parser.options, {})
return translated_markdown.encode('utf-8')
def translate(self, document: Document) -> Self:
tokens, segments, originals, cache = self._pre_translate(document)
if not originals:
self.logger.info("\n文件中没有找到需要翻译的纯文本内容。")
document.content = content.encode()
self.logger.info("翻译完成")
return self
translated = self.translate_agent.send_segments(originals, self.chunk_size)
document.content = self._after_translate(tokens, segments, translated, cache)
return self
async def translate_async(self, document: MarkdownDocument) -> Self:
self.logger.info("正在翻译markdown")
with MDMaskUrisContext(document):
chunks: list[str] = split_markdown_text(document.content.decode(), self.chunk_size)
self.logger.info(f"markdown分为{len(chunks)}")
result: list[str] = await self.translate_agent.send_prompts_async(chunks)
async def translate_async(self, document: Document) -> Self:
tokens, segments, originals, cache = await asyncio.to_thread(self._pre_translate, document)
if not originals:
self.logger.info("\n文件中没有找到需要翻译的纯文本内容。")
return self
def run():
content = join_markdown_texts(result)
# 做一些加强鲁棒性的操作
content = content.replace(r'\', r'\(')
content = content.replace(r'\', r'\)')
content = clean_markdown_math_block(content)
document.content = content.encode()
translated = await self.translate_agent.send_segments_async(originals, self.chunk_size)
document.content = await asyncio.to_thread(self._after_translate, tokens, segments, translated, cache)
await asyncio.to_thread(run)
self.logger.info("翻译完成")
return self