diff --git a/docutranslate/utils/markdown_splitter.py b/docutranslate/utils/markdown_splitter.py index 8bfaac8..e9b0ca6 100644 --- a/docutranslate/utils/markdown_splitter.py +++ b/docutranslate/utils/markdown_splitter.py @@ -8,7 +8,7 @@ class MarkdownBlockSplitter: 初始化Markdown分块器 参数: - max_block_size: 每个块的最大字符数 + max_block_size: 每个块的最大字节数 """ self.max_block_size = max_block_size @@ -16,241 +16,190 @@ class MarkdownBlockSplitter: def _get_bytes(text: str) -> int: return len(text.encode('utf-8')) - # TODO: 修复分块有时候会有空白块的问题 def split_markdown(self, markdown_text: str) -> List[str]: """ 将Markdown文本分割成指定大小的块 确保可以通过简单拼接重建原始文本(分割的代码块除外) 尽量保持标题与其对应内容在同一个块中 - - 参数: - markdown_text: 输入的Markdown文本 - - 返回: - Markdown块组成的列表 """ - # 首先将文本分割成逻辑块并保持结构 - blocks = self._split_into_logical_blocks(markdown_text) + # 1. 将文本分割成逻辑块 + logical_blocks = self._split_into_logical_blocks(markdown_text) - # 然后合并块,同时遵守大小限制并保持标题与内容在一起 - result_blocks = [] - current_block = [] + # 2. 合并逻辑块,使其不超过 max_block_size + chunks = [] + current_chunk_parts = [] current_size = 0 - pending_heading = None # 等待内容的标题 - for block in blocks: + for block in logical_blocks: block_size = self._get_bytes(block) - is_heading = bool(re.match(r'^#{1,6}\s+.+', block.strip())) - is_separator = block.strip() == '' and block.count('\n') > 0 - # 情况1:块本身过大,无法单独放入 + # 情况1:块本身就过大 if block_size > self.max_block_size: - # 先输出已积累的内容 - if current_block: - result_blocks.append('\n'.join(current_block)) - current_block = [] + # 先将当前积累的块输出 + if current_chunk_parts: + chunks.append("".join(current_chunk_parts)) + current_chunk_parts = [] current_size = 0 - pending_heading = None - # 分割大块并添加所有部分 - large_block_parts = self._split_large_block(block) - result_blocks.extend(large_block_parts) + # 分割这个超大块并直接添加到结果中 + chunks.extend(self._split_large_block(block)) continue - # 情况2:添加此块会超出大小限制 - if current_size + block_size + (1 if current_block else 0) > self.max_block_size: - # 如果有等待内容的标题,尝试将其与内容保持在一起 - if pending_heading and not is_heading and not is_separator: - # 如果只添加标题和此块能放下,则这样做 - if self._get_bytes(pending_heading) + block_size + 1 <= self.max_block_size: - result_blocks.append('\n'.join(current_block[:-1])) # 输出不含标题的内容 - current_block = [pending_heading, block] - current_size = self._get_bytes(pending_heading) + 1 + block_size - pending_heading = None - continue + # 情况2:将此块添加到当前chunk会超限 + if current_size + block_size > self.max_block_size: + if current_chunk_parts: + chunks.append("".join(current_chunk_parts)) - # 否则输出当前块并开始新块 - if current_block: - result_blocks.append('\n'.join(current_block)) - - current_block = [block] + current_chunk_parts = [block] current_size = block_size - pending_heading = block if is_heading else None - continue - - # 情况3:正常情况 - 添加到当前块 - if current_block: - current_block.append(block) - current_size += 1 + block_size # 加1是因为换行符 + # 情况3:正常添加 else: - current_block.append(block) - current_size = block_size + current_chunk_parts.append(block) + current_size += block_size - # 更新等待标题状态 - if is_heading: - pending_heading = block - elif not is_separator and pending_heading: - # 已在标题后添加内容,清除等待状态 - pending_heading = None + # 添加最后一个剩余的chunk + if current_chunk_parts: + chunks.append("".join(current_chunk_parts)) - # 添加最后一个块(如果存在) - if current_block: - result_blocks.append('\n'.join(current_block)) - - return result_blocks + return chunks def _split_into_logical_blocks(self, markdown_text: str) -> List[str]: """ - 将Markdown文本分割成逻辑块(标题、段落、代码块等) - 同时保持原始结构包括空行 - - 参数: - markdown_text: 输入的Markdown文本 - - 返回: - Markdown块列表 + 将Markdown文本分割成逻辑块(标题、段落、代码块、空行分隔符等) """ # 标准化换行符 - markdown_text = markdown_text.replace('\r\n', '\n') + text = markdown_text.replace('\r\n', '\n') - # 首先将代码块与其他内容分开 + # 分割代码块和其他内容 code_block_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~)' - parts = re.split(code_block_pattern, markdown_text) + parts = re.split(code_block_pattern, text) blocks = [] for i, part in enumerate(parts): - if i % 2 == 1: # 代码块 + if not part: + continue + + if i % 2 == 1: # 这是一个代码块 blocks.append(part) - elif part: # 非代码内容 - # 分割成行同时保留空行 - lines = part.split('\n') - current_block = [] - - for line in lines: - # 检查是否是标题 - if re.match(r'^#{1,6}\s+.+', line.strip()): - # 输出已积累的内容 - if current_block: - blocks.append('\n'.join(current_block)) - current_block = [] - # 将标题作为单独块添加 - blocks.append(line) - else: - # 对于非标题行,用适当的换行符积累 - if current_block: - current_block.append(line) - else: - current_block = [line] - - # 添加剩余内容 - if current_block: - blocks.append('\n'.join(current_block)) + else: # 这是普通Markdown内容 + # 按一个或多个空行分割,并保留分隔符 + # 这能有效分离段落、列表、标题等,并保留它们之间的空行 + sub_parts = re.split(r'(\n{2,})', part) + # 过滤掉 re.split 可能产生的空字符串 + blocks.extend([p for p in sub_parts if p]) return blocks def _split_large_block(self, block: str) -> List[str]: """ - 分割超过max_block_size的大块 - 总是在行边界处分割 - - 参数: - block: 大的Markdown块 - - 返回: - 小块组成的列表 + 分割单个超过 max_block_size 的大块 """ - # 特殊处理代码块 - if block.startswith('```') or block.startswith('~~~'): + # 优先处理代码块 + if block.startswith(('```', '~~~')): fence = '```' if block.startswith('```') else '~~~' - parts = block.split('\n') + lines = block.split('\n') + header = lines[0] + footer = lines[-1] + content_lines = lines[1:-1] - # 提取语言说明符(如果存在) - first_line = parts[0] - remaining_lines = parts[1:-1] # 排除开始和结束标记 - closing_fence = parts[-1] + chunks = [] + current_chunk_lines = [header] + current_size = self._get_bytes(header) + 1 - result = [] - current_chunk = [first_line] - current_size = self._get_bytes(first_line) + for line in content_lines: + line_size = self._get_bytes(line) + 1 + if current_size + line_size + self._get_bytes(footer) > self.max_block_size: + current_chunk_lines.append(footer) + chunks.append('\n'.join(current_chunk_lines)) + current_chunk_lines = [header, line] + current_size = self._get_bytes(header) + 1 + line_size + else: + current_chunk_lines.append(line) + current_size += line_size - for line in remaining_lines: - line_len = self._get_bytes(line) + 1 # +1是因为换行符 + if len(current_chunk_lines) > 1: + current_chunk_lines.append(footer) + chunks.append('\n'.join(current_chunk_lines)) + return chunks - if current_size + line_len + self._get_bytes(closing_fence) > self.max_block_size: - # 关闭当前块并开始新块 - result.append('\n'.join(current_chunk + [closing_fence])) - current_chunk = [first_line] # 新块使用相同的开始标记 - current_size = self._get_bytes(first_line) - - current_chunk.append(line) - current_size += line_len - - # 添加最后的块 - if len(current_chunk) > 1: # 有超出开始标记的内容 - result.append('\n'.join(current_chunk + [closing_fence])) - - return result - - # 对于非代码块,简单地在行边界处分割 + # 对普通大文本按行分割 lines = block.split('\n') - result = [] + chunks = [] current_chunk = [] current_size = 0 - for line in lines: - line_len = self._get_bytes(line) + 1 # +1是因为换行符 - - if current_size + line_len > self.max_block_size and current_chunk: - result.append('\n'.join(current_chunk)) + line_size = self._get_bytes(line) + 1 + if current_size + line_size > self.max_block_size and current_chunk: + chunks.append('\n'.join(current_chunk)) current_chunk = [line] - current_size = line_len + current_size = line_size - 1 # -1 for the first line does not have a leading '\n' else: - if current_chunk: - current_chunk.append(line) - current_size += line_len - else: - current_chunk = [line] - current_size = line_len + current_chunk.append(line) + current_size += line_size if current_chunk: - result.append('\n'.join(current_chunk)) + chunks.append('\n'.join(current_chunk)) - return result + return chunks -def split_markdown_text(markdown_text:str, max_block_size=5000): +def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]: """ 将Markdown字符串分割成不超过max_block_size的块 - 可以通过简单拼接重建原始文本(分割的代码块除外) - 尽量保持标题与其对应内容在一起 - - 参数: - markdown_text: 输入的Markdown文本 - max_block_size: 每个块的最大字符数 - - 返回: - 可以通过''.join(chunks)重建的Markdown块列表 """ splitter = MarkdownBlockSplitter(max_block_size=max_block_size) chunks = splitter.split_markdown(markdown_text) - # 过滤空白块 - chunks = [chunk for chunk in chunks if chunk.strip()] - return chunks + # 过滤掉仅由空白字符组成的块 + return [chunk for chunk in chunks if chunk.strip()] -def join_markdown_texts(markdown_texts: list[str]) -> str: - if len(markdown_texts) == 0: return "" - result = markdown_texts[0] - pre = markdown_texts[0] - for text in markdown_texts[1:]: - # 只有表格会收到多余空行的影响 - if text.lstrip().startswith("|") and pre.rstrip().endswith("|"): - result = result + "\n" + text +def _needs_single_newline_join(prev_chunk: str, next_chunk: str) -> bool: + """ + 判断两个块是否应该用单个换行符连接 + 这通常发生在列表、表格、引用块的连续行之间 + """ + if not prev_chunk.strip() or not next_chunk.strip(): + return False + + last_line_prev = prev_chunk.rstrip().split('\n')[-1].lstrip() + first_line_next = next_chunk.lstrip().split('\n')[0].lstrip() + + # 表格 + if last_line_prev.startswith('|') and last_line_prev.endswith('|') and \ + first_line_next.startswith('|') and first_line_next.endswith('|'): + return True + + # 列表 (无序和有序) + list_markers = r'^\s*([-*+]|\d+\.)\s+' + if re.match(list_markers, last_line_prev) and re.match(list_markers, first_line_next): + return True + + # 引用 + if last_line_prev.startswith('>') and first_line_next.startswith('>'): + return True + + return False + + +def join_markdown_texts(markdown_texts: List[str]) -> str: + """ + 智能地拼接Markdown块列表 + """ + if not markdown_texts: + return "" + + joined_text = markdown_texts[0] + for i in range(1, len(markdown_texts)): + prev_chunk = markdown_texts[i - 1] + current_chunk = markdown_texts[i] + + # 判断是否应该用单换行还是双换行 + if _needs_single_newline_join(prev_chunk, current_chunk): + separator = "\n" else: - result += "\n\n" + text - pre = text - return result + # 默认使用双换行来分隔不同的块 + separator = "\n\n" + joined_text += separator + current_chunk -if __name__ == '__main__': - pass + return joined_text \ No newline at end of file