修复图片后不换行的问题

This commit is contained in:
xunbu
2025-12-14 10:50:13 +08:00
parent b63bd6fc73
commit a7c265de55

View File

@@ -1,7 +1,11 @@
# SPDX-FileCopyrightText: 2025 QinHan # SPDX-FileCopyrightText: 2025 QinHan
# SPDX-License-Identifier: MPL-2.0 # SPDX-License-Identifier: MPL-2.0
import re import re
from typing import List, Tuple, Optional from typing import List, Tuple
# 定义一个特殊的合并标记,用于在 separators 中标记“这是一个被切断的代码块,需要无缝合并”
# 包含生僻字符防止冲突
MERGE_CODE_TOKEN = "\u0000<!--MD_SPLIT_MERGE-->\u0000"
def is_placeholder(text: str) -> bool: def is_placeholder(text: str) -> bool:
@@ -12,22 +16,36 @@ def is_placeholder(text: str) -> bool:
class MarkdownBlockSplitter: class MarkdownBlockSplitter:
def __init__(self, max_block_size: int = 5000): def __init__(self, max_block_size: int = 5000):
self.max_block_size = max_block_size self.max_block_size = max_block_size
# 匹配 代码块 或 占位符
self.special_token_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~|<ph-[a-zA-Z0-9]+>)' self.special_token_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~|<ph-[a-zA-Z0-9]+>)'
@staticmethod @staticmethod
def _get_bytes(text: str) -> int: def _get_bytes(text: str) -> int:
return len(text.encode('utf-8')) return len(text.encode('utf-8'))
def split_with_layout(self, markdown_text: str) -> Tuple[List[str], List[str]]: def _split_text_by_bytes(self, text: str, limit: int) -> List[str]:
""" """兜底逻辑:按字节强制切分"""
分割Markdown并返回 (内容块列表, 分隔符列表) encoded = text.encode('utf-8')
separators[i] 是 chunks[i] 和 chunks[i+1] 之间的原始文本 if len(encoded) <= limit:
""" return [text]
# 1. 细粒度切分:将文本切分为 [Block, Separator, Block, Separator...] result = []
raw_blocks, raw_separators = self._tokenize(markdown_text) start = 0
total_len = len(encoded)
while start < total_len:
end = min(start + limit, total_len)
if end < total_len:
while end > start:
try:
encoded[start:end].decode('utf-8')
break
except UnicodeDecodeError:
end -= 1
chunk_str = encoded[start:end].decode('utf-8')
result.append(chunk_str)
start = end
return result
# 2. 聚合:将小的 Block 合并为大的 Chunk同时合并中间的 Separator def split_with_layout(self, markdown_text: str) -> Tuple[List[str], List[str]]:
raw_blocks, raw_separators = self._tokenize(markdown_text)
chunks = [] chunks = []
final_separators = [] final_separators = []
@@ -41,207 +59,212 @@ class MarkdownBlockSplitter:
next_block = raw_blocks[i + 1] next_block = raw_blocks[i + 1]
separator = raw_separators[i] separator = raw_separators[i]
next_block_size = self._get_bytes(next_block) # 注意合并标记本身不占翻译配额计算大小时可以忽略或按0计算
# 但为了安全,我们按实际长度算(它很短)
separator_size = self._get_bytes(separator) separator_size = self._get_bytes(separator)
next_block_size = self._get_bytes(next_block)
# 判断是否需要切分
# 1. 遇到占位符,强制切分(为了保护图片不被混入翻译文本中)
# 2. 当前块 + 分隔符 + 下一块 超过最大限制
if is_placeholder(current_chunk) or is_placeholder(next_block) or \ if is_placeholder(current_chunk) or is_placeholder(next_block) or \
(current_size + separator_size + next_block_size > self.max_block_size): (current_size + separator_size + next_block_size > self.max_block_size):
# 结束当前块
chunks.append(current_chunk) chunks.append(current_chunk)
# 记录连接到下一块的分隔符
final_separators.append(separator) final_separators.append(separator)
# 开始新块
current_chunk = next_block current_chunk = next_block
current_size = next_block_size current_size = next_block_size
else: else:
# 合并
# 新的当前块 = 旧当前块 + 分隔符 + 下一块
current_chunk += separator + next_block current_chunk += separator + next_block
current_size += separator_size + next_block_size current_size += separator_size + next_block_size
# 添加最后一个块
chunks.append(current_chunk) chunks.append(current_chunk)
return chunks, final_separators return chunks, final_separators
def _tokenize(self, text: str) -> Tuple[List[str], List[str]]: def _tokenize(self, text: str) -> Tuple[List[str], List[str]]:
"""
将文本初步标记化为逻辑单元。
逻辑单元包括:代码块、占位符、普通段落。
单元之间的所有字符(通常是空白)都被视为分隔符。
"""
text = text.replace('\r\n', '\n') text = text.replace('\r\n', '\n')
# 1. 按 代码块 和 占位符 初步切分
# re.split 包含捕获组时,结果列表为: [Text, Token, Text, Token, Text]
parts = re.split(self.special_token_pattern, text) parts = re.split(self.special_token_pattern, text)
blocks = [] # 存储逻辑内容块 blocks = []
separators = [] # 存储块之间的分隔符 separators = []
# 临时缓冲区,用于处理 split 产生的纯文本部分
def process_text_part(text_part):
if not text_part:
return []
# 对普通文本,按段落(双换行)再次切分
# 我们需要保留切分符,所以用捕获组
sub_parts = re.split(r'(\n{2,})', text_part)
return sub_parts
# 初始化:处理第一个部分
# 整个流程是一个状态机,我们在寻找 "Content" -> "Separator" -> "Content" 的链条
# 为了简化逻辑,我们先把 parts 扁平化为一个 token 流
# 流中的元素要么是重要Token(Code/PH),要么是普通文本(Text)
flat_tokens = []
for i, part in enumerate(parts):
if not part:
continue
if re.match(self.special_token_pattern, part):
flat_tokens.append({'type': 'special', 'text': part})
else:
# 普通文本,继续细分段落
sub_parts = process_text_part(part)
for sp in sub_parts:
if not sp: continue
# 只有双换行才被明确视为分隔符逻辑,单换行通常归于段落内
# 但为了精准还原,我们把所有 re.split 出来的项都视为独立单元
flat_tokens.append({'type': 'text', 'text': sp})
if not flat_tokens:
return [], []
# 接下来进行 "Whitespace Shifting" (空白归约)
# 我们希望 block 是纯净的内容separator 是 block 之间的空白
# 例如: "Text \n <ph>" -> Block="Text", Sep=" \n ", Block="<ph>"
normalized_blocks = []
normalized_separators = []
current_block_text = ""
pending_separator = "" pending_separator = ""
for i, token in enumerate(flat_tokens): def add_safe_block(content):
content = token['text'] """添加普通文本块"""
nonlocal pending_separator
if self._get_bytes(content) <= self.max_block_size:
if blocks:
separators.append(pending_separator)
pending_separator = ""
blocks.append(content)
else:
# 文本如果还超限,只能强行字节切分(无法包裹,因为是普通文本)
sub_chunks = self._split_text_by_bytes(content, self.max_block_size)
for idx, sub in enumerate(sub_chunks):
sep = pending_separator if idx == 0 else ""
if blocks: separators.append(sep)
if idx == 0: pending_separator = ""
blocks.append(sub)
# 如果是特殊块(代码/占位符),它本身就是核心内容,前后不能有粘连 for part in parts:
if token['type'] == 'special': if not part: continue
if current_block_text:
normalized_blocks.append(current_block_text) # === A. 特殊块 (代码/图片) ===
normalized_separators.append(pending_separator) if re.match(self.special_token_pattern, part):
current_block_text = "" # 1. 正常大小:直接添加
if self._get_bytes(part) < self.max_block_size:
if blocks:
separators.append(pending_separator)
pending_separator = ""
else:
pending_separator = ""
blocks.append(part)
continue
# 2. 超大代码块:进行“合成拆分”
# 判断是否是图片占位符(图片占位符理论上不会超大,但为了严谨)
if is_placeholder(part):
# 图片无法拆分,只能强制字节切断(极罕见情况)
add_safe_block(part)
continue
# 是超大代码块,解析头尾
# part 格式如: ```python\ncode...\n```
lines = part.split('\n')
header = lines[0] # e.g., ```python
footer = lines[-1] # e.g., ```
# 获取中间的纯代码内容
# 注意:如果 split 后只有一行或两行,说明结构有问题,直接降级
if len(lines) < 2:
add_safe_block(part)
continue
inner_content = "\n".join(lines[1:-1])
# 将中间内容按行切分
inner_parts = re.split(r'(\n)', inner_content)
# 重新组合成多个小的完整代码块
current_sub_block_content = ""
# 结算 pending_separator (代码块之前的文本分隔符)
if blocks:
separators.append(pending_separator)
pending_separator = ""
else:
pending_separator = "" pending_separator = ""
normalized_blocks.append(content) # 遍历代码内容行
# 特殊块处理完,它的位置占住了,接下来的空白应该算作 separator first_chunk = True
# 但我们需要看下一个 token 是啥。
# 简单处理:将特殊块直接加入,接下来的文本如果是空白,就是 separator for sub in inner_parts:
# 尝试构建当前块Header + Current + sub + Footer
# 我们需要预估加上这一行后,是否会超限
# 预估大小 = Header + \n + Current + sub + \n + Footer
potential_content = current_sub_block_content + sub
# 构造一个假想的完整块来测大小
synthetic_block = f"{header}\n{potential_content}\n{footer}"
if self._get_bytes(synthetic_block) > self.max_block_size:
# 如果加上这行就超了,先把之前的 current_sub_block_content 封包
if current_sub_block_content:
wrapped_block = f"{header}\n{current_sub_block_content}\n{footer}"
blocks.append(wrapped_block)
# 添加特殊合并标记(不是空字符串,而是 MERGE_CODE_TOKEN
separators.append(MERGE_CODE_TOKEN)
first_chunk = False
# 新的一块开始
current_sub_block_content = sub
else:
current_sub_block_content += sub
# 添加最后剩余的部分
if current_sub_block_content:
wrapped_block = f"{header}\n{current_sub_block_content}\n{footer}"
# 如果前面已经有分块了,这里需要加 merge token
# 如果这是唯一的块(虽然逻辑上不可能走到这),也不影响
if not first_chunk:
separators.append(MERGE_CODE_TOKEN)
blocks.append(wrapped_block)
# 代码块处理完,无需 pending_separator因为这是原子的
continue continue
# 如果是普通文本 # === B. 普通文本 ===
# 检查是否全是空白(这是分隔符候选) sub_parts = re.split(r'(\n)', part)
if not content.strip(): for sub in sub_parts:
# 如果当前没有积累的 block这可能是开头的空白或者是两个 special 块之间的空白 if not sub: continue
if not normalized_blocks and not current_block_text: if sub == '\n' or not sub.strip():
# 忽略文件开头的空白,或者附加到下一个块? pending_separator += sub
# 为了对齐 list 长度,通常忽略开头,或者视为第一个块的一部分(如果不翻译) continue
pass
elif normalized_blocks and not current_block_text:
# 前面已经有一个完整块,现在还没开始新块,这个空白是 separator
# 如果之前已经有 pending_separator则叠加
if len(normalized_separators) < len(normalized_blocks):
normalized_separators.append(content)
else:
# 这种情况应该少见,追加到上一个 separator
normalized_separators[-1] += content
else:
# current_block_text 正在积累,遇到了空白
# 比如 "Hello \n\n World" 中的 \n\n
# 结束当前块
normalized_blocks.append(current_block_text)
current_block_text = ""
normalized_separators.append(content)
else:
# 是有内容的文本
# 剥离前导空白(归入上一个分隔符)和尾随空白(归入下一个分隔符)?
# 简单起见,利用 rstrip 将尾部空白视为分隔符的一部分
# 更好的策略: lstripped = sub.lstrip()
# 文本 token 自身可能包含换行(段落内)。 leading_ws = sub[:len(sub) - len(lstripped)]
# 我们只在 tokenize 阶段切分了 \n{2,}。 body = lstripped.rstrip()
# 所以 content 基本是一个完整的段落或代码块周围的文本。 trailing_ws = lstripped[len(body):]
# 如果上一个块已经结束 (normalized_blocks > normalized_separators),说明缺分隔符 pending_separator += leading_ws
if len(normalized_blocks) > len(normalized_separators): add_safe_block(body)
# 这意味着两个非空文本紧挨着?理论上 tokenize 阶段应该切开了 pending_separator = trailing_ws
normalized_separators.append("")
# 剥离尾部空白作为 potential separator while len(separators) < len(blocks) - 1:
stripped = content.rstrip() separators.append("\n")
trailing_space = content[len(stripped):]
if current_block_text: return blocks, separators
# 合并到当前正在构建的段落极少发生因为我们按split切分
current_block_text += content
else:
# 新的文本块
# 但要注意,如果这个文本块前面有空白,那个空白已经在上面处理了
# 这里只需要处理自己
normalized_blocks.append(stripped)
if trailing_space:
# 这个尾部空白暂时存起来,看后面接什么
# 实际上在我们的循环模型里,直接视为 separator 比较安全
# 除非它是文件结尾
if i < len(flat_tokens) - 1:
normalized_separators.append(trailing_space)
else:
# 文件末尾的空白,可以忽略或加回 block
normalized_blocks[-1] += trailing_space
# 修正长度separators 数量应该是 blocks - 1
while len(normalized_separators) < len(normalized_blocks) - 1:
normalized_separators.append("\n\n") # 默认 fallback
return normalized_blocks, normalized_separators
def split_markdown_with_layout(markdown_text: str, max_block_size=5000) -> Tuple[List[str], List[str]]: def split_markdown_with_layout(markdown_text: str, max_block_size=5000) -> Tuple[List[str], List[str]]:
"""
外部调用的主入口
返回: (chunks, separators)
"""
splitter = MarkdownBlockSplitter(max_block_size=max_block_size) splitter = MarkdownBlockSplitter(max_block_size=max_block_size)
return splitter.split_with_layout(markdown_text) return splitter.split_with_layout(markdown_text)
def join_markdown_with_layout(chunks: List[str], separators: List[str]) -> str: def join_markdown_with_layout(chunks: List[str], separators: List[str]) -> str:
""" """
使用保存的分隔符还原 Markdown 还原 Markdown
重点:检测 MERGE_CODE_TOKEN如果遇到则剥离 Chunk A 的尾巴和 Chunk B 的头,
实现无缝拼接。
""" """
if not chunks: if not chunks:
return "" return ""
result = chunks[0] result = chunks[0]
for i in range(len(separators)): for i in range(len(separators)):
# 安全检查,防止索引越界(虽然 split 保证了长度对应)
sep = separators[i] if i < len(separators) else "\n\n" sep = separators[i] if i < len(separators) else "\n\n"
next_chunk = chunks[i + 1] if i + 1 < len(chunks) else "" next_chunk = chunks[i + 1] if i + 1 < len(chunks) else ""
result += sep + next_chunk
if sep == MERGE_CODE_TOKEN:
# === 执行代码块无缝合并 ===
# 上一块:剥离末尾的 ```
# 下一块:剥离包含语言标识的 ```lang
# 1. 处理 result (当前累积的文本,结尾应该是 ```)
# 使用 rstrip 仅仅去掉空白,然后去掉最后三个反引号
# 正则:匹配末尾的 ``` 以及可能的空白
result = re.sub(r'```\s*$', '', result.rstrip())
# 2. 处理 next_chunk (开头应该是 ```python 或 ```)
# 正则:匹配开头的 ```加可选语言标识 加换行
# 注意:翻译后的 next_chunk 可能包含前面多余的空行,先 lstrip 比较安全
# 但也不能 lstrip 太多导致缩进丢失。代码块的 fence 通常顶格或跟随缩进。
# 这里简单处理顶格的情况。
next_chunk = re.sub(r'^\s*```.*\n', '', next_chunk, count=1)
# 直接拼接,不加换行符,因为 split 时 content 包含了换行
result += next_chunk
else:
# 普通拼接
result += sep + next_chunk
return result return result
# 兼容旧接口,防止其他地方报错 # 兼容旧接口
def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]: def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]:
chunks, _ = split_markdown_with_layout(markdown_text, max_block_size) chunks, _ = split_markdown_with_layout(markdown_text, max_block_size)
return chunks return chunks
def join_markdown_texts(markdown_texts: List[str]) -> str: def join_markdown_texts(markdown_texts: List[str]) -> str:
# 旧接口只能猜,建议尽量使用新接口
return "\n\n".join(markdown_texts) return "\n\n".join(markdown_texts)