diff --git a/docutranslate/agents/markdown_agent.py b/docutranslate/agents/markdown_agent.py index 43095c1..f616f71 100644 --- a/docutranslate/agents/markdown_agent.py +++ b/docutranslate/agents/markdown_agent.py @@ -19,7 +19,6 @@ def generate_prompt(markdown_text: str, to_lang: str): return f""" Treat the text input as markdown text and translate it into {to_lang},output translation ONLY. - NO explanations. NO notes. -- (very important) Preserve all placeholders in the format (example: , ). - For special tags or other non-translatable elements (like codes, brand names, specific jargon), keep them in their original form. - All formulas, regardless of length, must be represented as valid, parsable LaTeX. They must be correctly enclosed by `$`, `\\(\\)`, or `$$`. If a formula is not formatted correctly, you must fix it. - Remove or correct any obviously abnormal characters, but without altering the original meaning. diff --git a/docutranslate/translator/ai_translator/md_translator.py b/docutranslate/translator/ai_translator/md_translator.py index d14d44a..4e19e77 100644 --- a/docutranslate/translator/ai_translator/md_translator.py +++ b/docutranslate/translator/ai_translator/md_translator.py @@ -9,8 +9,12 @@ from docutranslate.agents.markdown_agent import MDTranslateAgentConfig from docutranslate.context.md_mask_context import MDMaskUrisContext from docutranslate.ir.markdown_document import MarkdownDocument from docutranslate.translator.ai_translator.base import AiTranslatorConfig, AiTranslator -# 引入新的 is_placeholder 函数 -from docutranslate.utils.markdown_splitter import split_markdown_text, join_markdown_texts, is_placeholder +# 引入新的布局分割和拼接函数 +from docutranslate.utils.markdown_splitter import ( + split_markdown_with_layout, + join_markdown_with_layout, + is_placeholder +) @dataclass @@ -42,14 +46,15 @@ class MDTranslator(AiTranslator): def translate(self, document: MarkdownDocument) -> Self: self.logger.info("正在翻译markdown") with MDMaskUrisContext(document): - chunks: list[str] = split_markdown_text(document.content.decode(), self.chunk_size) + # 使用新接口,获取 chunks 和对应的 separators + chunks, separators = split_markdown_with_layout(document.content.decode(), self.chunk_size) translate_indices: List[int] = [] translate_chunks: List[str] = [] - final_result: List[str] = list(chunks) + final_result: List[str] = list(chunks) # 浅拷贝,用于回填翻译结果 for i, chunk in enumerate(chunks): - # 直接使用 splitter 中定义的函数 + # 占位符不翻译 if is_placeholder(chunk): continue else: @@ -68,8 +73,9 @@ class MDTranslator(AiTranslator): for idx, translated_text in zip(translate_indices, translated_sub_results): final_result[idx] = translated_text - content = join_markdown_texts(final_result) - # 做一些加强鲁棒性的操作 + # 使用记录的 separators 进行还原,完美保留布局 + content = join_markdown_with_layout(final_result, separators) + content = content.replace(r'\(', r'\(') content = content.replace(r'\)', r'\)') @@ -80,7 +86,8 @@ class MDTranslator(AiTranslator): async def translate_async(self, document: MarkdownDocument) -> Self: self.logger.info("正在翻译markdown") with MDMaskUrisContext(document): - chunks: list[str] = split_markdown_text(document.content.decode(), self.chunk_size) + # 异步方法同样更新 + chunks, separators = split_markdown_with_layout(document.content.decode(), self.chunk_size) translate_indices: List[int] = [] translate_chunks: List[str] = [] @@ -107,7 +114,7 @@ class MDTranslator(AiTranslator): final_result[idx] = translated_text def run(): - content = join_markdown_texts(final_result) + content = join_markdown_with_layout(final_result, separators) content = content.replace(r'\(', r'\(') content = content.replace(r'\)', r'\)') document.content = content.encode() diff --git a/docutranslate/utils/markdown_splitter.py b/docutranslate/utils/markdown_splitter.py index f53dc56..e4b364d 100644 --- a/docutranslate/utils/markdown_splitter.py +++ b/docutranslate/utils/markdown_splitter.py @@ -1,213 +1,247 @@ # SPDX-FileCopyrightText: 2025 QinHan # SPDX-License-Identifier: MPL-2.0 import re -from typing import List +from typing import List, Tuple, Optional def is_placeholder(text: str) -> bool: - """ - 判断文本块是否仅包含图片占位符 - 匹配格式: (允许前后空白) - """ + """判断文本块是否是图片占位符""" return bool(re.match(r'^\s*\s*$', text)) class MarkdownBlockSplitter: def __init__(self, max_block_size: int = 5000): - """ - 初始化Markdown分块器 - 参数: - max_block_size: 每个块的最大字节数 - """ self.max_block_size = max_block_size - self.placeholder_pattern = r'()' + # 匹配 代码块 或 占位符 + self.special_token_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~|)' @staticmethod def _get_bytes(text: str) -> int: return len(text.encode('utf-8')) - def split_markdown(self, markdown_text: str) -> List[str]: + def split_with_layout(self, markdown_text: str) -> Tuple[List[str], List[str]]: """ - 将Markdown文本分割成指定大小的块 + 分割Markdown,并返回 (内容块列表, 分隔符列表) + separators[i] 是 chunks[i] 和 chunks[i+1] 之间的原始文本 """ - logical_blocks = self._split_into_logical_blocks(markdown_text) + # 1. 细粒度切分:将文本切分为 [Block, Separator, Block, Separator...] + raw_blocks, raw_separators = self._tokenize(markdown_text) + # 2. 聚合:将小的 Block 合并为大的 Chunk,同时合并中间的 Separator chunks = [] - current_chunk_parts = [] - current_size = 0 + final_separators = [] - for block in logical_blocks: - block_size = self._get_bytes(block) + if not raw_blocks: + return [], [] - # 如果是占位符,必须单独成块,且强制切断当前累积的内容 - if is_placeholder(block): - if current_chunk_parts: - chunks.append("".join(current_chunk_parts)) - current_chunk_parts = [] - current_size = 0 - chunks.append(block) - continue + current_chunk = raw_blocks[0] + current_size = self._get_bytes(current_chunk) - # 情况1:块本身就过大 - if block_size > self.max_block_size: - if current_chunk_parts: - chunks.append("".join(current_chunk_parts)) - current_chunk_parts = [] - current_size = 0 - chunks.extend(self._split_large_block(block)) - continue + for i in range(len(raw_separators)): + next_block = raw_blocks[i + 1] + separator = raw_separators[i] - # 情况2:将此块添加到当前chunk会超限 - if current_size + block_size > self.max_block_size: - if current_chunk_parts: - chunks.append("".join(current_chunk_parts)) - current_chunk_parts = [block] - current_size = block_size - # 情况3:正常添加 + next_block_size = self._get_bytes(next_block) + separator_size = self._get_bytes(separator) + + # 判断是否需要切分 + # 1. 遇到占位符,强制切分(为了保护图片不被混入翻译文本中) + # 2. 当前块 + 分隔符 + 下一块 超过最大限制 + if is_placeholder(current_chunk) or is_placeholder(next_block) or \ + (current_size + separator_size + next_block_size > self.max_block_size): + + # 结束当前块 + chunks.append(current_chunk) + # 记录连接到下一块的分隔符 + final_separators.append(separator) + + # 开始新块 + current_chunk = next_block + current_size = next_block_size else: - current_chunk_parts.append(block) - current_size += block_size + # 合并 + # 新的当前块 = 旧当前块 + 分隔符 + 下一块 + current_chunk += separator + next_block + current_size += separator_size + next_block_size - if current_chunk_parts: - chunks.append("".join(current_chunk_parts)) + # 添加最后一个块 + chunks.append(current_chunk) - return chunks + return chunks, final_separators - def _split_into_logical_blocks(self, markdown_text: str) -> List[str]: - text = markdown_text.replace('\r\n', '\n') - # 分割代码块 - code_block_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~)' - parts = re.split(code_block_pattern, text) + def _tokenize(self, text: str) -> Tuple[List[str], List[str]]: + """ + 将文本初步标记化为逻辑单元。 + 逻辑单元包括:代码块、占位符、普通段落。 + 单元之间的所有字符(通常是空白)都被视为分隔符。 + """ + text = text.replace('\r\n', '\n') - blocks = [] + # 1. 按 代码块 和 占位符 初步切分 + # re.split 包含捕获组时,结果列表为: [Text, Token, Text, Token, Text] + parts = re.split(self.special_token_pattern, text) + + blocks = [] # 存储逻辑内容块 + separators = [] # 存储块之间的分隔符 + + # 临时缓冲区,用于处理 split 产生的纯文本部分 + def process_text_part(text_part): + if not text_part: + return [] + # 对普通文本,按段落(双换行)再次切分 + # 我们需要保留切分符,所以用捕获组 + sub_parts = re.split(r'(\n{2,})', text_part) + return sub_parts + + # 初始化:处理第一个部分 + # 整个流程是一个状态机,我们在寻找 "Content" -> "Separator" -> "Content" 的链条 + + # 为了简化逻辑,我们先把 parts 扁平化为一个 token 流 + # 流中的元素要么是重要Token(Code/PH),要么是普通文本(Text) + flat_tokens = [] for i, part in enumerate(parts): if not part: continue - - # 代码块直接添加 - if i % 2 == 1: - blocks.append(part) + if re.match(self.special_token_pattern, part): + flat_tokens.append({'type': 'special', 'text': part}) else: - # 普通文本:先切分出占位符 - ph_parts = re.split(self.placeholder_pattern, part) - for ph_part in ph_parts: - if not ph_part: - continue + # 普通文本,继续细分段落 + sub_parts = process_text_part(part) + for sp in sub_parts: + if not sp: continue + # 只有双换行才被明确视为分隔符逻辑,单换行通常归于段落内 + # 但为了精准还原,我们把所有 re.split 出来的项都视为独立单元 + flat_tokens.append({'type': 'text', 'text': sp}) - if is_placeholder(ph_part): - blocks.append(ph_part) + if not flat_tokens: + return [], [] + + # 接下来进行 "Whitespace Shifting" (空白归约) + # 我们希望 block 是纯净的内容,separator 是 block 之间的空白 + # 例如: "Text \n " -> Block="Text", Sep=" \n ", Block="" + + normalized_blocks = [] + normalized_separators = [] + + current_block_text = "" + pending_separator = "" + + for i, token in enumerate(flat_tokens): + content = token['text'] + + # 如果是特殊块(代码/占位符),它本身就是核心内容,前后不能有粘连 + if token['type'] == 'special': + if current_block_text: + normalized_blocks.append(current_block_text) + normalized_separators.append(pending_separator) + current_block_text = "" + pending_separator = "" + + normalized_blocks.append(content) + # 特殊块处理完,它的位置占住了,接下来的空白应该算作 separator + # 但我们需要看下一个 token 是啥。 + # 简单处理:将特殊块直接加入,接下来的文本如果是空白,就是 separator + continue + + # 如果是普通文本 + # 检查是否全是空白(这是分隔符候选) + if not content.strip(): + # 如果当前没有积累的 block,这可能是开头的空白,或者是两个 special 块之间的空白 + if not normalized_blocks and not current_block_text: + # 忽略文件开头的空白,或者附加到下一个块? + # 为了对齐 list 长度,通常忽略开头,或者视为第一个块的一部分(如果不翻译) + pass + elif normalized_blocks and not current_block_text: + # 前面已经有一个完整块,现在还没开始新块,这个空白是 separator + # 如果之前已经有 pending_separator,则叠加 + if len(normalized_separators) < len(normalized_blocks): + normalized_separators.append(content) else: - # 再按空行切分段落 - sub_parts = re.split(r'(\n{2,})', ph_part) - blocks.extend([p for p in sub_parts if p]) - return blocks - - def _split_large_block(self, block: str) -> List[str]: - # 代码块处理 - if block.startswith(('```', '~~~')): - lines = block.split('\n') - header = lines[0] - footer = lines[-1] - content_lines = lines[1:-1] - chunks = [] - current_chunk_lines = [header] - current_size = self._get_bytes(header) + 1 - - for line in content_lines: - line_size = self._get_bytes(line) + 1 - if current_size + line_size + self._get_bytes(footer) > self.max_block_size: - current_chunk_lines.append(footer) - chunks.append('\n'.join(current_chunk_lines)) - current_chunk_lines = [header, line] - current_size = self._get_bytes(header) + 1 + line_size + # 这种情况应该少见,追加到上一个 separator + normalized_separators[-1] += content else: - current_chunk_lines.append(line) - current_size += line_size - - if len(current_chunk_lines) > 1: - current_chunk_lines.append(footer) - chunks.append('\n'.join(current_chunk_lines)) - return chunks - - # 普通文本处理 - lines = block.split('\n') - chunks = [] - current_chunk = [] - current_size = 0 - for line in lines: - line_size = self._get_bytes(line) + 1 - if current_size + line_size > self.max_block_size and current_chunk: - chunks.append('\n'.join(current_chunk)) - current_chunk = [line] - current_size = line_size - 1 + # current_block_text 正在积累,遇到了空白 + # 比如 "Hello \n\n World" 中的 \n\n + # 结束当前块 + normalized_blocks.append(current_block_text) + current_block_text = "" + normalized_separators.append(content) else: - current_chunk.append(line) - current_size += line_size + # 是有内容的文本 + # 剥离前导空白(归入上一个分隔符)和尾随空白(归入下一个分隔符)? + # 简单起见,利用 rstrip 将尾部空白视为分隔符的一部分 - if current_chunk: - chunks.append('\n'.join(current_chunk)) - return chunks + # 更好的策略: + # 文本 token 自身可能包含换行(段落内)。 + # 我们只在 tokenize 阶段切分了 \n{2,}。 + # 所以 content 基本是一个完整的段落或代码块周围的文本。 + + # 如果上一个块已经结束 (normalized_blocks > normalized_separators),说明缺分隔符 + if len(normalized_blocks) > len(normalized_separators): + # 这意味着两个非空文本紧挨着?理论上 tokenize 阶段应该切开了 + normalized_separators.append("") + + # 剥离尾部空白作为 potential separator + stripped = content.rstrip() + trailing_space = content[len(stripped):] + + if current_block_text: + # 合并到当前正在构建的段落(极少发生,因为我们按split切分) + current_block_text += content + else: + # 新的文本块 + # 但要注意,如果这个文本块前面有空白,那个空白已经在上面处理了 + # 这里只需要处理自己 + normalized_blocks.append(stripped) + if trailing_space: + # 这个尾部空白暂时存起来,看后面接什么 + # 实际上在我们的循环模型里,直接视为 separator 比较安全 + # 除非它是文件结尾 + if i < len(flat_tokens) - 1: + normalized_separators.append(trailing_space) + else: + # 文件末尾的空白,可以忽略或加回 block + normalized_blocks[-1] += trailing_space + + # 修正长度:separators 数量应该是 blocks - 1 + while len(normalized_separators) < len(normalized_blocks) - 1: + normalized_separators.append("\n\n") # 默认 fallback + + return normalized_blocks, normalized_separators -def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]: +def split_markdown_with_layout(markdown_text: str, max_block_size=5000) -> Tuple[List[str], List[str]]: + """ + 外部调用的主入口 + 返回: (chunks, separators) + """ splitter = MarkdownBlockSplitter(max_block_size=max_block_size) - chunks = splitter.split_markdown(markdown_text) - # 过滤空块,但保留占位符 - return [chunk for chunk in chunks if chunk.strip() or is_placeholder(chunk)] + return splitter.split_with_layout(markdown_text) -def _needs_single_newline_join(prev_chunk: str, next_chunk: str) -> bool: - """判断常规文本是否需要单换行连接""" - if not prev_chunk.strip() or not next_chunk.strip(): - return False +def join_markdown_with_layout(chunks: List[str], separators: List[str]) -> str: + """ + 使用保存的分隔符还原 Markdown + """ + if not chunks: + return "" - last_line_prev = prev_chunk.rstrip().split('\n')[-1].lstrip() - first_line_next = next_chunk.lstrip().split('\n')[0].lstrip() + result = chunks[0] + for i in range(len(separators)): + # 安全检查,防止索引越界(虽然 split 保证了长度对应) + sep = separators[i] if i < len(separators) else "\n\n" + next_chunk = chunks[i + 1] if i + 1 < len(chunks) else "" + result += sep + next_chunk - # 表格 - if last_line_prev.startswith('|') and last_line_prev.endswith('|') and \ - first_line_next.startswith('|') and first_line_next.endswith('|'): - return True + return result - # 列表 - list_markers = r'^\s*([-*+]|\d+\.)\s+' - if re.match(list_markers, last_line_prev) and re.match(list_markers, first_line_next): - return True - # 引用 - if last_line_prev.startswith('>') and first_line_next.startswith('>'): - return True - - return False +# 兼容旧接口,防止其他地方报错 +def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]: + chunks, _ = split_markdown_with_layout(markdown_text, max_block_size) + return chunks def join_markdown_texts(markdown_texts: List[str]) -> str: - """ - 智能地拼接Markdown块列表 - """ - if not markdown_texts: - return "" - - joined_text = markdown_texts[0] - for i in range(1, len(markdown_texts)): - prev_chunk = markdown_texts[i - 1] - current_chunk = markdown_texts[i] - - # === 核心修复逻辑 === - # 如果前一块或后一块是占位符,强制使用单换行 '\n' - # 这样可以保证: - # 1. 连续的徽章/图片 [img1]\n[img2] 会紧凑排列(视为行内元素) - # 2. HTML结构

\n\n

不会被打断 - # 3. 标题后的图片 # Title\n 也能正常渲染 - if is_placeholder(prev_chunk) or is_placeholder(current_chunk): - separator = "\n" - - elif _needs_single_newline_join(prev_chunk, current_chunk): - separator = "\n" - else: - # 只有两个纯文本段落之间才用双换行 - separator = "\n\n" - - joined_text += separator + current_chunk - - return joined_text \ No newline at end of file + # 旧接口只能猜,建议尽量使用新接口 + return "\n\n".join(markdown_texts) \ No newline at end of file