新的markdown分块实现
This commit is contained in:
@@ -8,7 +8,7 @@ class MarkdownBlockSplitter:
|
|||||||
初始化Markdown分块器
|
初始化Markdown分块器
|
||||||
|
|
||||||
参数:
|
参数:
|
||||||
max_block_size: 每个块的最大字符数
|
max_block_size: 每个块的最大字节数
|
||||||
"""
|
"""
|
||||||
self.max_block_size = max_block_size
|
self.max_block_size = max_block_size
|
||||||
|
|
||||||
@@ -16,241 +16,190 @@ class MarkdownBlockSplitter:
|
|||||||
def _get_bytes(text: str) -> int:
|
def _get_bytes(text: str) -> int:
|
||||||
return len(text.encode('utf-8'))
|
return len(text.encode('utf-8'))
|
||||||
|
|
||||||
# TODO: 修复分块有时候会有空白块的问题
|
|
||||||
def split_markdown(self, markdown_text: str) -> List[str]:
|
def split_markdown(self, markdown_text: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
将Markdown文本分割成指定大小的块
|
将Markdown文本分割成指定大小的块
|
||||||
确保可以通过简单拼接重建原始文本(分割的代码块除外)
|
确保可以通过简单拼接重建原始文本(分割的代码块除外)
|
||||||
尽量保持标题与其对应内容在同一个块中
|
尽量保持标题与其对应内容在同一个块中
|
||||||
|
|
||||||
参数:
|
|
||||||
markdown_text: 输入的Markdown文本
|
|
||||||
|
|
||||||
返回:
|
|
||||||
Markdown块组成的列表
|
|
||||||
"""
|
"""
|
||||||
# 首先将文本分割成逻辑块并保持结构
|
# 1. 将文本分割成逻辑块
|
||||||
blocks = self._split_into_logical_blocks(markdown_text)
|
logical_blocks = self._split_into_logical_blocks(markdown_text)
|
||||||
|
|
||||||
# 然后合并块,同时遵守大小限制并保持标题与内容在一起
|
# 2. 合并逻辑块,使其不超过 max_block_size
|
||||||
result_blocks = []
|
chunks = []
|
||||||
current_block = []
|
current_chunk_parts = []
|
||||||
current_size = 0
|
current_size = 0
|
||||||
pending_heading = None # 等待内容的标题
|
|
||||||
|
|
||||||
for block in blocks:
|
for block in logical_blocks:
|
||||||
block_size = self._get_bytes(block)
|
block_size = self._get_bytes(block)
|
||||||
is_heading = bool(re.match(r'^#{1,6}\s+.+', block.strip()))
|
|
||||||
is_separator = block.strip() == '' and block.count('\n') > 0
|
|
||||||
|
|
||||||
# 情况1:块本身过大,无法单独放入
|
# 情况1:块本身就过大
|
||||||
if block_size > self.max_block_size:
|
if block_size > self.max_block_size:
|
||||||
# 先输出已积累的内容
|
# 先将当前积累的块输出
|
||||||
if current_block:
|
if current_chunk_parts:
|
||||||
result_blocks.append('\n'.join(current_block))
|
chunks.append("".join(current_chunk_parts))
|
||||||
current_block = []
|
current_chunk_parts = []
|
||||||
current_size = 0
|
current_size = 0
|
||||||
pending_heading = None
|
|
||||||
|
|
||||||
# 分割大块并添加所有部分
|
# 分割这个超大块并直接添加到结果中
|
||||||
large_block_parts = self._split_large_block(block)
|
chunks.extend(self._split_large_block(block))
|
||||||
result_blocks.extend(large_block_parts)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 情况2:添加此块会超出大小限制
|
# 情况2:将此块添加到当前chunk会超限
|
||||||
if current_size + block_size + (1 if current_block else 0) > self.max_block_size:
|
if current_size + block_size > self.max_block_size:
|
||||||
# 如果有等待内容的标题,尝试将其与内容保持在一起
|
if current_chunk_parts:
|
||||||
if pending_heading and not is_heading and not is_separator:
|
chunks.append("".join(current_chunk_parts))
|
||||||
# 如果只添加标题和此块能放下,则这样做
|
|
||||||
if self._get_bytes(pending_heading) + block_size + 1 <= self.max_block_size:
|
|
||||||
result_blocks.append('\n'.join(current_block[:-1])) # 输出不含标题的内容
|
|
||||||
current_block = [pending_heading, block]
|
|
||||||
current_size = self._get_bytes(pending_heading) + 1 + block_size
|
|
||||||
pending_heading = None
|
|
||||||
continue
|
|
||||||
|
|
||||||
# 否则输出当前块并开始新块
|
current_chunk_parts = [block]
|
||||||
if current_block:
|
|
||||||
result_blocks.append('\n'.join(current_block))
|
|
||||||
|
|
||||||
current_block = [block]
|
|
||||||
current_size = block_size
|
current_size = block_size
|
||||||
pending_heading = block if is_heading else None
|
# 情况3:正常添加
|
||||||
continue
|
|
||||||
|
|
||||||
# 情况3:正常情况 - 添加到当前块
|
|
||||||
if current_block:
|
|
||||||
current_block.append(block)
|
|
||||||
current_size += 1 + block_size # 加1是因为换行符
|
|
||||||
else:
|
else:
|
||||||
current_block.append(block)
|
current_chunk_parts.append(block)
|
||||||
current_size = block_size
|
current_size += block_size
|
||||||
|
|
||||||
# 更新等待标题状态
|
# 添加最后一个剩余的chunk
|
||||||
if is_heading:
|
if current_chunk_parts:
|
||||||
pending_heading = block
|
chunks.append("".join(current_chunk_parts))
|
||||||
elif not is_separator and pending_heading:
|
|
||||||
# 已在标题后添加内容,清除等待状态
|
|
||||||
pending_heading = None
|
|
||||||
|
|
||||||
# 添加最后一个块(如果存在)
|
return chunks
|
||||||
if current_block:
|
|
||||||
result_blocks.append('\n'.join(current_block))
|
|
||||||
|
|
||||||
return result_blocks
|
|
||||||
|
|
||||||
def _split_into_logical_blocks(self, markdown_text: str) -> List[str]:
|
def _split_into_logical_blocks(self, markdown_text: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
将Markdown文本分割成逻辑块(标题、段落、代码块等)
|
将Markdown文本分割成逻辑块(标题、段落、代码块、空行分隔符等)
|
||||||
同时保持原始结构包括空行
|
|
||||||
|
|
||||||
参数:
|
|
||||||
markdown_text: 输入的Markdown文本
|
|
||||||
|
|
||||||
返回:
|
|
||||||
Markdown块列表
|
|
||||||
"""
|
"""
|
||||||
# 标准化换行符
|
# 标准化换行符
|
||||||
markdown_text = markdown_text.replace('\r\n', '\n')
|
text = markdown_text.replace('\r\n', '\n')
|
||||||
|
|
||||||
# 首先将代码块与其他内容分开
|
# 分割代码块和其他内容
|
||||||
code_block_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~)'
|
code_block_pattern = r'(```[\s\S]*?```|~~~[\s\S]*?~~~)'
|
||||||
parts = re.split(code_block_pattern, markdown_text)
|
parts = re.split(code_block_pattern, text)
|
||||||
|
|
||||||
blocks = []
|
blocks = []
|
||||||
for i, part in enumerate(parts):
|
for i, part in enumerate(parts):
|
||||||
if i % 2 == 1: # 代码块
|
if not part:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if i % 2 == 1: # 这是一个代码块
|
||||||
blocks.append(part)
|
blocks.append(part)
|
||||||
elif part: # 非代码内容
|
else: # 这是普通Markdown内容
|
||||||
# 分割成行同时保留空行
|
# 按一个或多个空行分割,并保留分隔符
|
||||||
lines = part.split('\n')
|
# 这能有效分离段落、列表、标题等,并保留它们之间的空行
|
||||||
current_block = []
|
sub_parts = re.split(r'(\n{2,})', part)
|
||||||
|
# 过滤掉 re.split 可能产生的空字符串
|
||||||
for line in lines:
|
blocks.extend([p for p in sub_parts if p])
|
||||||
# 检查是否是标题
|
|
||||||
if re.match(r'^#{1,6}\s+.+', line.strip()):
|
|
||||||
# 输出已积累的内容
|
|
||||||
if current_block:
|
|
||||||
blocks.append('\n'.join(current_block))
|
|
||||||
current_block = []
|
|
||||||
# 将标题作为单独块添加
|
|
||||||
blocks.append(line)
|
|
||||||
else:
|
|
||||||
# 对于非标题行,用适当的换行符积累
|
|
||||||
if current_block:
|
|
||||||
current_block.append(line)
|
|
||||||
else:
|
|
||||||
current_block = [line]
|
|
||||||
|
|
||||||
# 添加剩余内容
|
|
||||||
if current_block:
|
|
||||||
blocks.append('\n'.join(current_block))
|
|
||||||
|
|
||||||
return blocks
|
return blocks
|
||||||
|
|
||||||
def _split_large_block(self, block: str) -> List[str]:
|
def _split_large_block(self, block: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
分割超过max_block_size的大块
|
分割单个超过 max_block_size 的大块
|
||||||
总是在行边界处分割
|
|
||||||
|
|
||||||
参数:
|
|
||||||
block: 大的Markdown块
|
|
||||||
|
|
||||||
返回:
|
|
||||||
小块组成的列表
|
|
||||||
"""
|
"""
|
||||||
# 特殊处理代码块
|
# 优先处理代码块
|
||||||
if block.startswith('```') or block.startswith('~~~'):
|
if block.startswith(('```', '~~~')):
|
||||||
fence = '```' if block.startswith('```') else '~~~'
|
fence = '```' if block.startswith('```') else '~~~'
|
||||||
parts = block.split('\n')
|
lines = block.split('\n')
|
||||||
|
header = lines[0]
|
||||||
|
footer = lines[-1]
|
||||||
|
content_lines = lines[1:-1]
|
||||||
|
|
||||||
# 提取语言说明符(如果存在)
|
chunks = []
|
||||||
first_line = parts[0]
|
current_chunk_lines = [header]
|
||||||
remaining_lines = parts[1:-1] # 排除开始和结束标记
|
current_size = self._get_bytes(header) + 1
|
||||||
closing_fence = parts[-1]
|
|
||||||
|
|
||||||
result = []
|
for line in content_lines:
|
||||||
current_chunk = [first_line]
|
line_size = self._get_bytes(line) + 1
|
||||||
current_size = self._get_bytes(first_line)
|
if current_size + line_size + self._get_bytes(footer) > self.max_block_size:
|
||||||
|
current_chunk_lines.append(footer)
|
||||||
|
chunks.append('\n'.join(current_chunk_lines))
|
||||||
|
current_chunk_lines = [header, line]
|
||||||
|
current_size = self._get_bytes(header) + 1 + line_size
|
||||||
|
else:
|
||||||
|
current_chunk_lines.append(line)
|
||||||
|
current_size += line_size
|
||||||
|
|
||||||
for line in remaining_lines:
|
if len(current_chunk_lines) > 1:
|
||||||
line_len = self._get_bytes(line) + 1 # +1是因为换行符
|
current_chunk_lines.append(footer)
|
||||||
|
chunks.append('\n'.join(current_chunk_lines))
|
||||||
|
return chunks
|
||||||
|
|
||||||
if current_size + line_len + self._get_bytes(closing_fence) > self.max_block_size:
|
# 对普通大文本按行分割
|
||||||
# 关闭当前块并开始新块
|
|
||||||
result.append('\n'.join(current_chunk + [closing_fence]))
|
|
||||||
current_chunk = [first_line] # 新块使用相同的开始标记
|
|
||||||
current_size = self._get_bytes(first_line)
|
|
||||||
|
|
||||||
current_chunk.append(line)
|
|
||||||
current_size += line_len
|
|
||||||
|
|
||||||
# 添加最后的块
|
|
||||||
if len(current_chunk) > 1: # 有超出开始标记的内容
|
|
||||||
result.append('\n'.join(current_chunk + [closing_fence]))
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
# 对于非代码块,简单地在行边界处分割
|
|
||||||
lines = block.split('\n')
|
lines = block.split('\n')
|
||||||
result = []
|
chunks = []
|
||||||
current_chunk = []
|
current_chunk = []
|
||||||
current_size = 0
|
current_size = 0
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
line_len = self._get_bytes(line) + 1 # +1是因为换行符
|
line_size = self._get_bytes(line) + 1
|
||||||
|
if current_size + line_size > self.max_block_size and current_chunk:
|
||||||
if current_size + line_len > self.max_block_size and current_chunk:
|
chunks.append('\n'.join(current_chunk))
|
||||||
result.append('\n'.join(current_chunk))
|
|
||||||
current_chunk = [line]
|
current_chunk = [line]
|
||||||
current_size = line_len
|
current_size = line_size - 1 # -1 for the first line does not have a leading '\n'
|
||||||
else:
|
else:
|
||||||
if current_chunk:
|
current_chunk.append(line)
|
||||||
current_chunk.append(line)
|
current_size += line_size
|
||||||
current_size += line_len
|
|
||||||
else:
|
|
||||||
current_chunk = [line]
|
|
||||||
current_size = line_len
|
|
||||||
|
|
||||||
if current_chunk:
|
if current_chunk:
|
||||||
result.append('\n'.join(current_chunk))
|
chunks.append('\n'.join(current_chunk))
|
||||||
|
|
||||||
return result
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
def split_markdown_text(markdown_text:str, max_block_size=5000):
|
def split_markdown_text(markdown_text: str, max_block_size=5000) -> List[str]:
|
||||||
"""
|
"""
|
||||||
将Markdown字符串分割成不超过max_block_size的块
|
将Markdown字符串分割成不超过max_block_size的块
|
||||||
可以通过简单拼接重建原始文本(分割的代码块除外)
|
|
||||||
尽量保持标题与其对应内容在一起
|
|
||||||
|
|
||||||
参数:
|
|
||||||
markdown_text: 输入的Markdown文本
|
|
||||||
max_block_size: 每个块的最大字符数
|
|
||||||
|
|
||||||
返回:
|
|
||||||
可以通过''.join(chunks)重建的Markdown块列表
|
|
||||||
"""
|
"""
|
||||||
splitter = MarkdownBlockSplitter(max_block_size=max_block_size)
|
splitter = MarkdownBlockSplitter(max_block_size=max_block_size)
|
||||||
chunks = splitter.split_markdown(markdown_text)
|
chunks = splitter.split_markdown(markdown_text)
|
||||||
# 过滤空白块
|
# 过滤掉仅由空白字符组成的块
|
||||||
chunks = [chunk for chunk in chunks if chunk.strip()]
|
return [chunk for chunk in chunks if chunk.strip()]
|
||||||
return chunks
|
|
||||||
|
|
||||||
|
|
||||||
def join_markdown_texts(markdown_texts: list[str]) -> str:
|
def _needs_single_newline_join(prev_chunk: str, next_chunk: str) -> bool:
|
||||||
if len(markdown_texts) == 0: return ""
|
"""
|
||||||
result = markdown_texts[0]
|
判断两个块是否应该用单个换行符连接
|
||||||
pre = markdown_texts[0]
|
这通常发生在列表、表格、引用块的连续行之间
|
||||||
for text in markdown_texts[1:]:
|
"""
|
||||||
# 只有表格会收到多余空行的影响
|
if not prev_chunk.strip() or not next_chunk.strip():
|
||||||
if text.lstrip().startswith("|") and pre.rstrip().endswith("|"):
|
return False
|
||||||
result = result + "\n" + text
|
|
||||||
|
last_line_prev = prev_chunk.rstrip().split('\n')[-1].lstrip()
|
||||||
|
first_line_next = next_chunk.lstrip().split('\n')[0].lstrip()
|
||||||
|
|
||||||
|
# 表格
|
||||||
|
if last_line_prev.startswith('|') and last_line_prev.endswith('|') and \
|
||||||
|
first_line_next.startswith('|') and first_line_next.endswith('|'):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 列表 (无序和有序)
|
||||||
|
list_markers = r'^\s*([-*+]|\d+\.)\s+'
|
||||||
|
if re.match(list_markers, last_line_prev) and re.match(list_markers, first_line_next):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 引用
|
||||||
|
if last_line_prev.startswith('>') and first_line_next.startswith('>'):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def join_markdown_texts(markdown_texts: List[str]) -> str:
|
||||||
|
"""
|
||||||
|
智能地拼接Markdown块列表
|
||||||
|
"""
|
||||||
|
if not markdown_texts:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
joined_text = markdown_texts[0]
|
||||||
|
for i in range(1, len(markdown_texts)):
|
||||||
|
prev_chunk = markdown_texts[i - 1]
|
||||||
|
current_chunk = markdown_texts[i]
|
||||||
|
|
||||||
|
# 判断是否应该用单换行还是双换行
|
||||||
|
if _needs_single_newline_join(prev_chunk, current_chunk):
|
||||||
|
separator = "\n"
|
||||||
else:
|
else:
|
||||||
result += "\n\n" + text
|
# 默认使用双换行来分隔不同的块
|
||||||
pre = text
|
separator = "\n\n"
|
||||||
return result
|
|
||||||
|
|
||||||
|
joined_text += separator + current_chunk
|
||||||
|
|
||||||
if __name__ == '__main__':
|
return joined_text
|
||||||
pass
|
|
||||||
Reference in New Issue
Block a user