diff --git a/docutranslate/utils/markdown_splitter.py b/docutranslate/utils/markdown_splitter.py index e4b364d..8d8a297 100644 --- a/docutranslate/utils/markdown_splitter.py +++ b/docutranslate/utils/markdown_splitter.py @@ -1,7 +1,11 @@ # SPDX-FileCopyrightText: 2025 QinHan # SPDX-License-Identifier: MPL-2.0 import re -from typing import List, Tuple, Optional +from typing import List, Tuple + +# 定义一个特殊的合并标记,用于在 separators 中标记“这是一个被切断的代码块,需要无缝合并” +# 包含生僻字符防止冲突 +MERGE_CODE_TOKEN = "\u0000\u0000" def is_placeholder(text: str) -> bool: @@ -12,22 +16,36 @@ def is_placeholder(text: str) -> bool: class MarkdownBlockSplitter: def __init__(self, max_block_size: int = 5000): self.max_block_size = max_block_size - # 匹配 代码块 或 占位符 self.special_token_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~|)' @staticmethod def _get_bytes(text: str) -> int: return len(text.encode('utf-8')) - def split_with_layout(self, markdown_text: str) -> Tuple[List[str], List[str]]: - """ - 分割Markdown,并返回 (内容块列表, 分隔符列表) - separators[i] 是 chunks[i] 和 chunks[i+1] 之间的原始文本 - """ - # 1. 细粒度切分:将文本切分为 [Block, Separator, Block, Separator...] - raw_blocks, raw_separators = self._tokenize(markdown_text) + def _split_text_by_bytes(self, text: str, limit: int) -> List[str]: + """兜底逻辑:按字节强制切分""" + encoded = text.encode('utf-8') + if len(encoded) <= limit: + return [text] + result = [] + start = 0 + total_len = len(encoded) + while start < total_len: + end = min(start + limit, total_len) + if end < total_len: + while end > start: + try: + encoded[start:end].decode('utf-8') + break + except UnicodeDecodeError: + end -= 1 + chunk_str = encoded[start:end].decode('utf-8') + result.append(chunk_str) + start = end + return result - # 2. 聚合:将小的 Block 合并为大的 Chunk,同时合并中间的 Separator + def split_with_layout(self, markdown_text: str) -> Tuple[List[str], List[str]]: + raw_blocks, raw_separators = self._tokenize(markdown_text) chunks = [] final_separators = [] @@ -41,207 +59,212 @@ class MarkdownBlockSplitter: next_block = raw_blocks[i + 1] separator = raw_separators[i] - next_block_size = self._get_bytes(next_block) + # 注意:合并标记本身不占翻译配额,计算大小时可以忽略或按0计算 + # 但为了安全,我们按实际长度算(它很短) separator_size = self._get_bytes(separator) + next_block_size = self._get_bytes(next_block) - # 判断是否需要切分 - # 1. 遇到占位符,强制切分(为了保护图片不被混入翻译文本中) - # 2. 当前块 + 分隔符 + 下一块 超过最大限制 if is_placeholder(current_chunk) or is_placeholder(next_block) or \ (current_size + separator_size + next_block_size > self.max_block_size): - - # 结束当前块 chunks.append(current_chunk) - # 记录连接到下一块的分隔符 final_separators.append(separator) - - # 开始新块 current_chunk = next_block current_size = next_block_size else: - # 合并 - # 新的当前块 = 旧当前块 + 分隔符 + 下一块 current_chunk += separator + next_block current_size += separator_size + next_block_size - # 添加最后一个块 chunks.append(current_chunk) - return chunks, final_separators def _tokenize(self, text: str) -> Tuple[List[str], List[str]]: - """ - 将文本初步标记化为逻辑单元。 - 逻辑单元包括:代码块、占位符、普通段落。 - 单元之间的所有字符(通常是空白)都被视为分隔符。 - """ text = text.replace('\r\n', '\n') - - # 1. 按 代码块 和 占位符 初步切分 - # re.split 包含捕获组时,结果列表为: [Text, Token, Text, Token, Text] parts = re.split(self.special_token_pattern, text) - blocks = [] # 存储逻辑内容块 - separators = [] # 存储块之间的分隔符 - - # 临时缓冲区,用于处理 split 产生的纯文本部分 - def process_text_part(text_part): - if not text_part: - return [] - # 对普通文本,按段落(双换行)再次切分 - # 我们需要保留切分符,所以用捕获组 - sub_parts = re.split(r'(\n{2,})', text_part) - return sub_parts - - # 初始化:处理第一个部分 - # 整个流程是一个状态机,我们在寻找 "Content" -> "Separator" -> "Content" 的链条 - - # 为了简化逻辑,我们先把 parts 扁平化为一个 token 流 - # 流中的元素要么是重要Token(Code/PH),要么是普通文本(Text) - flat_tokens = [] - for i, part in enumerate(parts): - if not part: - continue - if re.match(self.special_token_pattern, part): - flat_tokens.append({'type': 'special', 'text': part}) - else: - # 普通文本,继续细分段落 - sub_parts = process_text_part(part) - for sp in sub_parts: - if not sp: continue - # 只有双换行才被明确视为分隔符逻辑,单换行通常归于段落内 - # 但为了精准还原,我们把所有 re.split 出来的项都视为独立单元 - flat_tokens.append({'type': 'text', 'text': sp}) - - if not flat_tokens: - return [], [] - - # 接下来进行 "Whitespace Shifting" (空白归约) - # 我们希望 block 是纯净的内容,separator 是 block 之间的空白 - # 例如: "Text \n " -> Block="Text", Sep=" \n ", Block="" - - normalized_blocks = [] - normalized_separators = [] - - current_block_text = "" + blocks = [] + separators = [] pending_separator = "" - for i, token in enumerate(flat_tokens): - content = token['text'] + def add_safe_block(content): + """添加普通文本块""" + nonlocal pending_separator + if self._get_bytes(content) <= self.max_block_size: + if blocks: + separators.append(pending_separator) + pending_separator = "" + blocks.append(content) + else: + # 文本如果还超限,只能强行字节切分(无法包裹,因为是普通文本) + sub_chunks = self._split_text_by_bytes(content, self.max_block_size) + for idx, sub in enumerate(sub_chunks): + sep = pending_separator if idx == 0 else "" + if blocks: separators.append(sep) + if idx == 0: pending_separator = "" + blocks.append(sub) - # 如果是特殊块(代码/占位符),它本身就是核心内容,前后不能有粘连 - if token['type'] == 'special': - if current_block_text: - normalized_blocks.append(current_block_text) - normalized_separators.append(pending_separator) - current_block_text = "" + for part in parts: + if not part: continue + + # === A. 特殊块 (代码/图片) === + if re.match(self.special_token_pattern, part): + # 1. 正常大小:直接添加 + if self._get_bytes(part) < self.max_block_size: + if blocks: + separators.append(pending_separator) + pending_separator = "" + else: + pending_separator = "" + blocks.append(part) + continue + + # 2. 超大代码块:进行“合成拆分” + # 判断是否是图片占位符(图片占位符理论上不会超大,但为了严谨) + if is_placeholder(part): + # 图片无法拆分,只能强制字节切断(极罕见情况) + add_safe_block(part) + continue + + # 是超大代码块,解析头尾 + # part 格式如: ```python\ncode...\n``` + lines = part.split('\n') + header = lines[0] # e.g., ```python + footer = lines[-1] # e.g., ``` + + # 获取中间的纯代码内容 + # 注意:如果 split 后只有一行或两行,说明结构有问题,直接降级 + if len(lines) < 2: + add_safe_block(part) + continue + + inner_content = "\n".join(lines[1:-1]) + + # 将中间内容按行切分 + inner_parts = re.split(r'(\n)', inner_content) + + # 重新组合成多个小的完整代码块 + current_sub_block_content = "" + + # 结算 pending_separator (代码块之前的文本分隔符) + if blocks: + separators.append(pending_separator) + pending_separator = "" + else: pending_separator = "" - normalized_blocks.append(content) - # 特殊块处理完,它的位置占住了,接下来的空白应该算作 separator - # 但我们需要看下一个 token 是啥。 - # 简单处理:将特殊块直接加入,接下来的文本如果是空白,就是 separator + # 遍历代码内容行 + first_chunk = True + + for sub in inner_parts: + # 尝试构建当前块:Header + Current + sub + Footer + # 我们需要预估加上这一行后,是否会超限 + # 预估大小 = Header + \n + Current + sub + \n + Footer + potential_content = current_sub_block_content + sub + + # 构造一个假想的完整块来测大小 + synthetic_block = f"{header}\n{potential_content}\n{footer}" + + if self._get_bytes(synthetic_block) > self.max_block_size: + # 如果加上这行就超了,先把之前的 current_sub_block_content 封包 + if current_sub_block_content: + wrapped_block = f"{header}\n{current_sub_block_content}\n{footer}" + blocks.append(wrapped_block) + + # 添加特殊合并标记(不是空字符串,而是 MERGE_CODE_TOKEN) + separators.append(MERGE_CODE_TOKEN) + + first_chunk = False + + # 新的一块开始 + current_sub_block_content = sub + else: + current_sub_block_content += sub + + # 添加最后剩余的部分 + if current_sub_block_content: + wrapped_block = f"{header}\n{current_sub_block_content}\n{footer}" + # 如果前面已经有分块了,这里需要加 merge token + # 如果这是唯一的块(虽然逻辑上不可能走到这),也不影响 + if not first_chunk: + separators.append(MERGE_CODE_TOKEN) + blocks.append(wrapped_block) + + # 代码块处理完,无需 pending_separator,因为这是原子的 continue - # 如果是普通文本 - # 检查是否全是空白(这是分隔符候选) - if not content.strip(): - # 如果当前没有积累的 block,这可能是开头的空白,或者是两个 special 块之间的空白 - if not normalized_blocks and not current_block_text: - # 忽略文件开头的空白,或者附加到下一个块? - # 为了对齐 list 长度,通常忽略开头,或者视为第一个块的一部分(如果不翻译) - pass - elif normalized_blocks and not current_block_text: - # 前面已经有一个完整块,现在还没开始新块,这个空白是 separator - # 如果之前已经有 pending_separator,则叠加 - if len(normalized_separators) < len(normalized_blocks): - normalized_separators.append(content) - else: - # 这种情况应该少见,追加到上一个 separator - normalized_separators[-1] += content - else: - # current_block_text 正在积累,遇到了空白 - # 比如 "Hello \n\n World" 中的 \n\n - # 结束当前块 - normalized_blocks.append(current_block_text) - current_block_text = "" - normalized_separators.append(content) - else: - # 是有内容的文本 - # 剥离前导空白(归入上一个分隔符)和尾随空白(归入下一个分隔符)? - # 简单起见,利用 rstrip 将尾部空白视为分隔符的一部分 + # === B. 普通文本 === + sub_parts = re.split(r'(\n)', part) + for sub in sub_parts: + if not sub: continue + if sub == '\n' or not sub.strip(): + pending_separator += sub + continue - # 更好的策略: - # 文本 token 自身可能包含换行(段落内)。 - # 我们只在 tokenize 阶段切分了 \n{2,}。 - # 所以 content 基本是一个完整的段落或代码块周围的文本。 + lstripped = sub.lstrip() + leading_ws = sub[:len(sub) - len(lstripped)] + body = lstripped.rstrip() + trailing_ws = lstripped[len(body):] - # 如果上一个块已经结束 (normalized_blocks > normalized_separators),说明缺分隔符 - if len(normalized_blocks) > len(normalized_separators): - # 这意味着两个非空文本紧挨着?理论上 tokenize 阶段应该切开了 - normalized_separators.append("") + pending_separator += leading_ws + add_safe_block(body) + pending_separator = trailing_ws - # 剥离尾部空白作为 potential separator - stripped = content.rstrip() - trailing_space = content[len(stripped):] + while len(separators) < len(blocks) - 1: + separators.append("\n") - if current_block_text: - # 合并到当前正在构建的段落(极少发生,因为我们按split切分) - current_block_text += content - else: - # 新的文本块 - # 但要注意,如果这个文本块前面有空白,那个空白已经在上面处理了 - # 这里只需要处理自己 - normalized_blocks.append(stripped) - if trailing_space: - # 这个尾部空白暂时存起来,看后面接什么 - # 实际上在我们的循环模型里,直接视为 separator 比较安全 - # 除非它是文件结尾 - if i < len(flat_tokens) - 1: - normalized_separators.append(trailing_space) - else: - # 文件末尾的空白,可以忽略或加回 block - normalized_blocks[-1] += trailing_space - - # 修正长度:separators 数量应该是 blocks - 1 - while len(normalized_separators) < len(normalized_blocks) - 1: - normalized_separators.append("\n\n") # 默认 fallback - - return normalized_blocks, normalized_separators + return blocks, separators def split_markdown_with_layout(markdown_text: str, max_block_size=5000) -> Tuple[List[str], List[str]]: - """ - 外部调用的主入口 - 返回: (chunks, separators) - """ splitter = MarkdownBlockSplitter(max_block_size=max_block_size) return splitter.split_with_layout(markdown_text) def join_markdown_with_layout(chunks: List[str], separators: List[str]) -> str: """ - 使用保存的分隔符还原 Markdown + 还原 Markdown。 + 重点:检测 MERGE_CODE_TOKEN,如果遇到,则剥离 Chunk A 的尾巴和 Chunk B 的头, + 实现无缝拼接。 """ if not chunks: return "" result = chunks[0] + for i in range(len(separators)): - # 安全检查,防止索引越界(虽然 split 保证了长度对应) sep = separators[i] if i < len(separators) else "\n\n" next_chunk = chunks[i + 1] if i + 1 < len(chunks) else "" - result += sep + next_chunk + + if sep == MERGE_CODE_TOKEN: + # === 执行代码块无缝合并 === + # 上一块:剥离末尾的 ``` + # 下一块:剥离包含语言标识的 ```lang + + # 1. 处理 result (当前累积的文本,结尾应该是 ```) + # 使用 rstrip 仅仅去掉空白,然后去掉最后三个反引号 + # 正则:匹配末尾的 ``` 以及可能的空白 + result = re.sub(r'```\s*$', '', result.rstrip()) + + # 2. 处理 next_chunk (开头应该是 ```python 或 ```) + # 正则:匹配开头的 ```加可选语言标识 加换行 + # 注意:翻译后的 next_chunk 可能包含前面多余的空行,先 lstrip 比较安全 + # 但也不能 lstrip 太多导致缩进丢失。代码块的 fence 通常顶格或跟随缩进。 + # 这里简单处理顶格的情况。 + next_chunk = re.sub(r'^\s*```.*\n', '', next_chunk, count=1) + + # 直接拼接,不加换行符,因为 split 时 content 包含了换行 + result += next_chunk + else: + # 普通拼接 + result += sep + next_chunk return result -# 兼容旧接口,防止其他地方报错 +# 兼容旧接口 def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]: chunks, _ = split_markdown_with_layout(markdown_text, max_block_size) return chunks def join_markdown_texts(markdown_texts: List[str]) -> str: - # 旧接口只能猜,建议尽量使用新接口 return "\n\n".join(markdown_texts) \ No newline at end of file