翻译markdown可以传入包含图片的zip压缩包
This commit is contained in:
@@ -6,11 +6,11 @@ import io
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
import threading
|
||||
import uuid
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
|
||||
class MaskDict:
|
||||
@@ -62,7 +62,7 @@ def uris2placeholder(markdown: str, mask_dict: MaskDict):
|
||||
# mask_dict.set(id, match.group(2))
|
||||
# return f"{match.group(1)}(<ph-{id}>)"
|
||||
|
||||
#整个图片都替换为占位符
|
||||
# 整个图片都替换为占位符
|
||||
mask_dict.set(id, match.group())
|
||||
return f"<ph-{id}>"
|
||||
|
||||
@@ -100,86 +100,100 @@ def find_markdown_in_zip(zip_bytes: bytes):
|
||||
raise ValueError("ZIP 中没有 Markdown 文件")
|
||||
|
||||
|
||||
def embed_inline_image_from_zip(zip_bytes: bytes, filename_in_zip: str, encoding="utf-8"):
|
||||
def embed_inline_image_from_zip(zip_bytes: bytes, filename_in_zip: str | None = None, encoding="utf-8"):
|
||||
"""
|
||||
从ZIP文件的字节流中读取一个Markdown文件,并将其中的相对路径图片内联为Base64编码的data URI。
|
||||
|
||||
Args:
|
||||
zip_bytes (bytes): ZIP文件的字节内容。
|
||||
filename_in_zip (str | None, optional):
|
||||
要处理的Markdown文件名。如果为 None,则自动查找并使用ZIP包中的第一个.md或.markdown文件。
|
||||
默认为 None。
|
||||
encoding (str, optional): Markdown文件的编码格式。默认为 "utf-8"。
|
||||
|
||||
Returns:
|
||||
str | None: 包含内联图片的Markdown文本内容,如果发生错误则返回None。
|
||||
"""
|
||||
zip_file_bytes = io.BytesIO(zip_bytes)
|
||||
|
||||
print(f"正在尝试打开内存中的ZIP存档...")
|
||||
print("正在尝试打开内存中的ZIP存档...")
|
||||
with zipfile.ZipFile(zip_file_bytes, 'r') as archive:
|
||||
print(f"ZIP存档已打开。正在查找文件 '{filename_in_zip}'...")
|
||||
print("ZIP存档已打开。")
|
||||
|
||||
if filename_in_zip not in archive.namelist():
|
||||
print(f"错误: 文件 '{filename_in_zip}' 在ZIP压缩包中未找到。")
|
||||
# --- 新增和修改的逻辑 ---
|
||||
target_md_filename = filename_in_zip
|
||||
|
||||
# 如果未指定文件名,则自动查找第一个Markdown文件
|
||||
if target_md_filename is None:
|
||||
print("`正在自动查找第一个Markdown文件...")
|
||||
found_md = None
|
||||
for name in archive.namelist():
|
||||
# 确保它是一个文件(不是目录),并检查扩展名
|
||||
if not name.endswith('/') and name.lower().endswith(('.md', '.markdown')):
|
||||
found_md = name
|
||||
break # 找到第一个就停止
|
||||
|
||||
if found_md:
|
||||
target_md_filename = found_md
|
||||
print(f"已自动选择Markdown文件: '{target_md_filename}'")
|
||||
else:
|
||||
print("错误: ZIP压缩包中未找到任何Markdown文件 (.md 或 .markdown)。")
|
||||
print(f"压缩包中的可用文件列表: {archive.namelist()}")
|
||||
return None
|
||||
|
||||
md_content_bytes = archive.read(filename_in_zip)
|
||||
print(f"文件 '{filename_in_zip}' 已找到并读取。")
|
||||
# 统一检查最终确定的文件是否存在于压缩包中
|
||||
if target_md_filename not in archive.namelist():
|
||||
print(f"错误: 文件 '{target_md_filename}' 在ZIP压缩包中未找到。")
|
||||
print(f"压缩包中的可用文件列表: {archive.namelist()}")
|
||||
return None
|
||||
|
||||
# --- 后续代码使用 target_md_filename ---
|
||||
print(f"正在读取文件 '{target_md_filename}'...")
|
||||
md_content_bytes = archive.read(target_md_filename)
|
||||
print(f"文件 '{target_md_filename}' 已读取。")
|
||||
md_content_text = md_content_bytes.decode(encoding)
|
||||
print(f"文件内容已使用 '{encoding}' 编码成功解码。")
|
||||
|
||||
# --- 新增:处理图片 ---
|
||||
print("开始处理Markdown中的图片...")
|
||||
# 获取Markdown文件在ZIP包内的基本目录,用于解析相对图片路径
|
||||
# 例如,如果 filename_in_zip 是 "docs/guide/full.md", base_md_path_in_zip 是 "docs/guide"
|
||||
# 如果 filename_in_zip 是 "full.md", base_md_path_in_zip 是 ""
|
||||
base_md_path_in_zip = os.path.dirname(filename_in_zip)
|
||||
# 获取Markdown文件在ZIP包内的基本目录
|
||||
base_md_path_in_zip = os.path.dirname(target_md_filename)
|
||||
|
||||
def replace_image_with_base64(match):
|
||||
alt_text = match.group(1)
|
||||
original_image_path = match.group(2)
|
||||
|
||||
# 检查是否是外部链接或已经是data URI
|
||||
if original_image_path.startswith(('http://', 'https://', 'data:')):
|
||||
print(f" 跳过外部或已内联图片: {original_image_path}")
|
||||
return match.group(0) # 返回原始匹配
|
||||
# print(f" 跳过外部或已内联图片: {original_image_path}")
|
||||
return match.group(0)
|
||||
|
||||
# 构建图片在ZIP文件中的绝对路径
|
||||
# os.path.join 会正确处理 base_md_path_in_zip 为空字符串的情况
|
||||
image_path_in_zip = os.path.join(base_md_path_in_zip, original_image_path)
|
||||
# zipfile 使用正斜杠,并且路径是相对于zip根目录的,os.path.normpath确保路径格式正确
|
||||
image_path_in_zip = os.path.normpath(image_path_in_zip).replace(os.sep, '/')
|
||||
|
||||
# 确保路径不是以 './' 开头,如果filename_in_zip在根目录且图片路径也是相对的
|
||||
if image_path_in_zip.startswith('./'):
|
||||
image_path_in_zip = image_path_in_zip[2:]
|
||||
|
||||
# print(f" 尝试内联图片: '{original_image_path}' (解析为ZIP内路径: '{image_path_in_zip}')")
|
||||
|
||||
try:
|
||||
image_bytes = archive.read(image_path_in_zip)
|
||||
|
||||
# 猜测MIME类型
|
||||
mime_type, _ = mimetypes.guess_type(image_path_in_zip)
|
||||
if not mime_type:
|
||||
# 备用:根据扩展名手动判断一些常见类型
|
||||
ext = os.path.splitext(image_path_in_zip)[1].lower()
|
||||
if ext == '.png':
|
||||
mime_type = 'image/png'
|
||||
elif ext in ['.jpg', '.jpeg']:
|
||||
mime_type = 'image/jpeg'
|
||||
elif ext == '.gif':
|
||||
mime_type = 'image/gif'
|
||||
elif ext == '.svg':
|
||||
mime_type = 'image/svg+xml'
|
||||
elif ext == '.webp':
|
||||
mime_type = 'image/webp'
|
||||
else:
|
||||
mime_map = {'.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
|
||||
'.gif': 'image/gif', '.svg': 'image/svg+xml', '.webp': 'image/webp'}
|
||||
mime_type = mime_map.get(ext)
|
||||
|
||||
if not mime_type:
|
||||
print(f" 警告: 无法确定图片 '{image_path_in_zip}' 的MIME类型。跳过内联。")
|
||||
return match.group(0) # 返回原始匹配
|
||||
return match.group(0)
|
||||
|
||||
base64_encoded_data = base64.b64encode(image_bytes).decode('utf-8')
|
||||
new_image_tag = f""
|
||||
# print(f" 成功内联图片: {original_image_path} -> data:{mime_type[:20]}...")
|
||||
return new_image_tag
|
||||
except KeyError:
|
||||
print(f" 警告: 图片 '{image_path_in_zip}' 在ZIP压缩包中未找到。原始链接将被保留。")
|
||||
return match.group(0) # 图片不在zip中,返回原始匹配
|
||||
return match.group(0)
|
||||
except Exception as e_img:
|
||||
print(f" 错误: 处理图片 '{image_path_in_zip}' 时发生错误: {e_img}。原始链接将被保留。")
|
||||
return match.group(0)
|
||||
|
||||
# 正则表达式查找Markdown图片: 
|
||||
# 修改了正则表达式,使其不贪婪地匹配alt文本和路径
|
||||
image_regex = r"!\[(.*?)\]\((.*?)\)"
|
||||
modified_md_content = re.sub(image_regex, replace_image_with_base64, md_content_text)
|
||||
|
||||
@@ -187,26 +201,28 @@ def embed_inline_image_from_zip(zip_bytes: bytes, filename_in_zip: str, encoding
|
||||
return modified_md_content
|
||||
|
||||
|
||||
def unembed_base64_images_to_zip(markdown:str,markdown_name:str,image_folder_name="images")->bytes:
|
||||
def unembed_base64_images_to_zip(markdown: str, markdown_name: str, image_folder_name="images") -> bytes:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
image_folder=os.path.join(temp_dir,image_folder_name)
|
||||
os.makedirs(image_folder,exist_ok=True)
|
||||
pattern=r"!\[(.*?)\]\(data:(.*?);.*base64,(.*)\)"
|
||||
def unembed_base64_images(match:re.Match)->str:
|
||||
image_folder = os.path.join(temp_dir, image_folder_name)
|
||||
os.makedirs(image_folder, exist_ok=True)
|
||||
pattern = r"!\[(.*?)\]\(data:(.*?);.*base64,(.*)\)"
|
||||
|
||||
def unembed_base64_images(match: re.Match) -> str:
|
||||
b64data = match.group(3)
|
||||
extension=mimetypes.guess_extension(match.group(2))
|
||||
image_id=hashlib.md5(b64data.encode()).hexdigest()[:8]
|
||||
image_name=f"{image_id}{extension}"
|
||||
url=f"./{image_folder_name}/{image_name}"
|
||||
#创建对应的image文件
|
||||
with open(os.path.join(image_folder,image_name),"wb") as f:
|
||||
extension = mimetypes.guess_extension(match.group(2))
|
||||
image_id = hashlib.md5(b64data.encode()).hexdigest()[:8]
|
||||
image_name = f"{image_id}{extension}"
|
||||
url = f"./{image_folder_name}/{image_name}"
|
||||
# 创建对应的image文件
|
||||
with open(os.path.join(image_folder, image_name), "wb") as f:
|
||||
f.write(base64.b64decode(b64data))
|
||||
return f""
|
||||
modified_md_content = re.sub(pattern, unembed_base64_images,markdown)
|
||||
with open(os.path.join(temp_dir,f"{markdown_name}"),"w",encoding="utf-8") as f:
|
||||
|
||||
modified_md_content = re.sub(pattern, unembed_base64_images, markdown)
|
||||
with open(os.path.join(temp_dir, f"{markdown_name}"), "w", encoding="utf-8") as f:
|
||||
f.write(modified_md_content)
|
||||
zip_buffer=io.BytesIO()
|
||||
folder_path=Path(temp_dir)
|
||||
zip_buffer = io.BytesIO()
|
||||
folder_path = Path(temp_dir)
|
||||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for file in folder_path.rglob('*'):
|
||||
if file.is_file():
|
||||
@@ -216,4 +232,3 @@ def unembed_base64_images_to_zip(markdown:str,markdown_name:str,image_folder_nam
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from docutranslate.global_values.conditional_import import DOCLING_EXIST
|
||||
from docutranslate.glossary.glossary import Glossary
|
||||
from docutranslate.ir.document import Document
|
||||
from docutranslate.ir.markdown_document import MarkdownDocument
|
||||
from docutranslate.utils.markdown_utils import embed_inline_image_from_zip
|
||||
|
||||
if DOCLING_EXIST:
|
||||
from docutranslate.converter.x2md.converter_docling import ConverterDoclingConfig, ConverterDocling
|
||||
@@ -38,7 +39,8 @@ class MarkdownBasedWorkflow(Workflow[MarkdownBasedWorkflowConfig, Document, Mark
|
||||
HTMLExportable[MD2HTMLExporterConfig],
|
||||
MDFormatsExportable[ExporterConfig]):
|
||||
_converter_factory: dict[
|
||||
ConvertEngineType, Tuple[Type[X2MarkdownConverter|ConverterIdentity], Type[X2MarkdownConverterConfig]] | None] = {
|
||||
ConvertEngineType, Tuple[Type[X2MarkdownConverter | ConverterIdentity], Type[
|
||||
X2MarkdownConverterConfig]] | None] = {
|
||||
"mineru": (ConverterMineru, ConverterMineruConfig),
|
||||
"identity": (ConverterIdentity, None)
|
||||
}
|
||||
@@ -58,11 +60,13 @@ class MarkdownBasedWorkflow(Workflow[MarkdownBasedWorkflowConfig, Document, Mark
|
||||
if self.document_original is None:
|
||||
raise RuntimeError("File has not been read yet. Call read_path or read_bytes first.")
|
||||
|
||||
if self.document_original.suffix.lower() == ".zip":
|
||||
self.document_original = self._get_md_from_zip(self.document_original)
|
||||
# 获取缓存的解析后文件
|
||||
document_cached = md_based_convert_cacher.get_cached_result(self.document_original, convert_engin,
|
||||
convert_config)
|
||||
if document_cached:
|
||||
self.attachment.add_document("md_cached",document_cached.copy())
|
||||
self.attachment.add_document("md_cached", document_cached.copy())
|
||||
return document_cached
|
||||
|
||||
# 未缓存则解析文件
|
||||
@@ -74,8 +78,8 @@ class MarkdownBasedWorkflow(Workflow[MarkdownBasedWorkflowConfig, Document, Mark
|
||||
converter = converter_class(convert_config)
|
||||
else:
|
||||
raise ValueError(f"不存在{convert_engin}解析引擎")
|
||||
document_md :Document= converter.convert(self.document_original)
|
||||
if hasattr(converter,"attachments"):
|
||||
document_md: Document = converter.convert(self.document_original)
|
||||
if hasattr(converter, "attachments"):
|
||||
for attachment in converter.attachments:
|
||||
self.attachment.add_attachment(attachment)
|
||||
# 缓存解析后文件
|
||||
@@ -83,8 +87,15 @@ class MarkdownBasedWorkflow(Workflow[MarkdownBasedWorkflowConfig, Document, Mark
|
||||
|
||||
return document_md
|
||||
|
||||
def _get_md_from_zip(self, document: Document) -> Document:
|
||||
assert document.suffix.lower() == ".zip"
|
||||
self.logger.info("传入zip文件,正在自动组合markdown文本与图片")
|
||||
content_byte = embed_inline_image_from_zip(document.content).encode()
|
||||
return document.from_bytes(content_byte, suffix=".md", stem=document.stem)
|
||||
|
||||
def _pre_translate(self, document: Document):
|
||||
convert_engine: ConvertEngineType = "identity" if document.suffix == ".md" else self.convert_engine
|
||||
convert_engine: ConvertEngineType = "identity" if document.suffix.lower() in [".md", ".markdown",
|
||||
".zip"] else self.convert_engine
|
||||
convert_config = self.config.converter_config
|
||||
translator_config = self.config.translator_config
|
||||
translator = MDTranslator(translator_config)
|
||||
|
||||
Reference in New Issue
Block a user