from __future__ import annotations import json import logging import time import zipfile from pathlib import Path import requests logger = logging.getLogger(__name__) MINERU_BASE = "https://mineru.net/api/v4" TERMINAL_STATES = {"done", "failed"} IN_PROGRESS_STATE_LABELS = { "waiting-file": "等待文件上传", "pending": "排队中", "running": "解析中", "converting": "格式转换中", } class MineruClientError(RuntimeError): pass class MineruClient: """MinerU 精准解析 API 客户端(需要 Token)。 针对本地图片文件的完整调用流程: 1. POST /file-urls/batch → 获取 batch_id + OSS 签名上传 URL 2. PUT 上传图片到 OSS → 系统自动感知并提交解析任务 3. GET /extract-results/batch/{batch_id} 轮询直到 state=done 4. 下载 full_zip_url,解压提取结构化 JSON 文件限制:≤ 200MB,≤ 600 页 支持格式:PDF、图片(png/jpg/jpeg/jp2/webp/gif/bmp)、Doc、Docx、Ppt、PPTx """ def __init__( self, api_key: str, model_version: str = "vlm", language: str = "ch", enable_table: bool = True, is_ocr: bool = True, enable_formula: bool = True, poll_interval: float = 3.0, timeout: float = 300.0, ) -> None: self.api_key = api_key self.model_version = model_version self.language = language self.enable_table = enable_table self.is_ocr = is_ocr self.enable_formula = enable_formula self.poll_interval = poll_interval self.timeout = timeout def parse_image(self, image_path: Path, output_dir: Path) -> dict: """解析本地图片文件,返回结构化 JSON 数据。 Parameters ---------- image_path: 本地图片路径(png/jpg/jpeg/jp2/webp/gif/bmp) output_dir: 中间产物(zip、解压目录)的存放目录 Returns ------- dict 包含 pdf_info 的结构化 JSON(layout.json 或 content_list.json) """ image_path = Path(image_path) if not image_path.exists(): raise FileNotFoundError(f"图片文件不存在: {image_path}") output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) logger.info("MinerU 精准解析开始: %s", image_path.name) batch_id, upload_url = self._request_upload_url(image_path.name) logger.info("MinerU 批次已创建: batch_id=%s", batch_id) self._upload_file(upload_url, image_path) logger.info("MinerU 文件上传完成: %s(系统自动提交解析)", image_path.name) zip_url = self._poll_batch_until_done(batch_id) logger.info("MinerU 解析完成: batch_id=%s", batch_id) zip_path = self._download_zip(zip_url, output_dir) extract_dir = output_dir / "result" self._extract_zip(zip_path, extract_dir) result = self._load_structured_json(extract_dir) logger.info("MinerU 结构化 JSON 加载完毕") return result # ------------------------------------------------------------------ # 内部方法 # ------------------------------------------------------------------ def _auth_headers(self) -> dict[str, str]: return { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } def _request_upload_url(self, file_name: str) -> tuple[str, str]: """申请批量上传链接,返回 (batch_id, oss_upload_url)。""" payload = { "files": [{"name": file_name, "is_ocr": self.is_ocr}], "model_version": self.model_version, "language": self.language, "enable_table": self.enable_table, "enable_formula": self.enable_formula, } try: resp = requests.post( f"{MINERU_BASE}/file-urls/batch", headers=self._auth_headers(), json=payload, timeout=30, ) resp.raise_for_status() except requests.RequestException as exc: raise MineruClientError(f"MinerU 申请上传 URL 失败: {exc}") from exc body = resp.json() if body.get("code") != 0: raise MineruClientError(f"MinerU 申请上传 URL 失败: {body.get('msg')}") data = body.get("data", {}) batch_id = data.get("batch_id") file_urls = data.get("file_urls", []) if not batch_id or not file_urls: raise MineruClientError("MinerU 返回的 batch_id 或 file_urls 为空") return batch_id, file_urls[0] def _upload_file(self, upload_url: str, image_path: Path) -> None: """将图片 PUT 上传到 OSS。上传时无需设置 Content-Type。""" try: with image_path.open("rb") as f: resp = requests.put(upload_url, data=f, timeout=120) except requests.RequestException as exc: raise MineruClientError(f"MinerU 文件上传网络错误: {exc}") from exc if resp.status_code not in (200, 201): raise MineruClientError( f"MinerU 文件上传失败: HTTP {resp.status_code} {resp.text[:200]}" ) def _poll_batch_until_done(self, batch_id: str) -> str: """轮询批次结果,返回 full_zip_url。""" url = f"{MINERU_BASE}/extract-results/batch/{batch_id}" deadline = time.monotonic() + self.timeout while time.monotonic() < deadline: try: resp = requests.get(url, headers=self._auth_headers(), timeout=30) resp.raise_for_status() except requests.RequestException as exc: raise MineruClientError(f"MinerU 查询批次状态失败: {exc}") from exc body = resp.json() if body.get("code") != 0: raise MineruClientError(f"MinerU 查询批次失败: {body.get('msg')}") results: list[dict] = body.get("data", {}).get("extract_result", []) if not results: time.sleep(self.poll_interval) continue item = results[0] state = item.get("state", "") label = IN_PROGRESS_STATE_LABELS.get(state, state) logger.info("MinerU 批次状态: batch_id=%s state=%s (%s)", batch_id, state, label) if state == "done": zip_url = item.get("full_zip_url") if not zip_url: raise MineruClientError("MinerU 完成但未返回 full_zip_url") return zip_url if state == "failed": err_msg = item.get("err_msg") or "未知错误" raise MineruClientError(f"MinerU 解析失败: {err_msg}") time.sleep(self.poll_interval) raise MineruClientError( f"MinerU 轮询超时 ({self.timeout:.0f}s): batch_id={batch_id}" ) def _download_zip(self, zip_url: str, output_dir: Path) -> Path: """下载结果 zip 包到本地。""" target = output_dir / "mineru_result.zip" try: resp = requests.get(zip_url, timeout=120, stream=True) resp.raise_for_status() with target.open("wb") as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) except requests.RequestException as exc: raise MineruClientError(f"MinerU zip 下载失败: {exc}") from exc logger.info("MinerU zip 下载完毕: %s", target) return target def _extract_zip(self, zip_path: Path, extract_dir: Path) -> None: extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path) as archive: archive.extractall(extract_dir) logger.info("MinerU zip 解压完毕: %s", extract_dir) def _load_structured_json(self, extract_dir: Path) -> dict: """从解压目录中找到并加载包含 pdf_info 的结构化 JSON。 MinerU zip 结构说明: layout.json → 中间处理结果(对应 middle.json) *_content_list.json → 内容列表 *_model.json → 模型推理结果 full.md → Markdown 解析结果 """ candidates = [ *sorted(extract_dir.rglob("layout.json")), *sorted(extract_dir.rglob("*layout*.json")), *sorted(extract_dir.rglob("*_content_list*.json")), *sorted(extract_dir.rglob("*.json")), ] seen: set[Path] = set() for candidate in candidates: if candidate in seen: continue seen.add(candidate) try: parsed = json.loads(candidate.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): continue if isinstance(parsed, dict) and isinstance(parsed.get("pdf_info"), list): logger.info("MinerU 结构化 JSON 选用: %s", candidate.name) return parsed raise MineruClientError( "MinerU 结果 zip 中未找到包含 pdf_info 的结构化 JSON" )