508 lines
20 KiB
Python
508 lines
20 KiB
Python
"""Core processing pipeline: AI → PDF → PNG → Qwen crop → MinerU → validate."""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
from backend.app.barcode_detector import detect_barcodes
|
||
from backend.app.image_classifier import is_qr_code
|
||
from backend.app.mineru_client import MineruClient, MineruClientError
|
||
from backend.app.mineru_parser import parse_mineru_fields, parse_mineru_image_blocks
|
||
from backend.app.text_validation import validate_field_against_word
|
||
from backend.app.word_parser import extract_word_html, extract_word_text
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Environment helpers #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def _get_mineru_api_key() -> str:
|
||
"""Read MINERU_API_KEY from the process environment or the project .env file."""
|
||
value = os.environ.get("MINERU_API_KEY", "").strip()
|
||
if value:
|
||
return value
|
||
|
||
for candidate in (
|
||
Path(__file__).resolve().parents[2] / ".env",
|
||
Path(__file__).resolve().parents[3] / ".env",
|
||
):
|
||
if not candidate.exists():
|
||
continue
|
||
for raw in candidate.read_text(encoding="utf-8").splitlines():
|
||
line = raw.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
key, val = line.split("=", 1)
|
||
if key.strip() == "MINERU_API_KEY":
|
||
cleaned = val.strip().strip('"').strip("'")
|
||
if cleaned:
|
||
logger.info("Loaded MINERU_API_KEY from %s", candidate)
|
||
return cleaned
|
||
return ""
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# AI → PDF conversion #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def _ai_to_pdf(ai_path: Path, output_dir: Path) -> Path:
|
||
"""Convert an Adobe Illustrator file to PDF, keeping the original filename stem.
|
||
|
||
Modern .ai files (CS and later) are internally PDF-based; pypdf can copy
|
||
them directly. Legacy EPS-based .ai files require Ghostscript.
|
||
If the uploaded file is already a PDF it is copied as-is.
|
||
"""
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
pdf_path = output_dir / f"{ai_path.stem}.pdf"
|
||
|
||
with ai_path.open("rb") as fh:
|
||
header = fh.read(8)
|
||
|
||
if header.startswith(b"%PDF-"):
|
||
# PDF-based .ai or an actual PDF – re-write with pypdf for cleanliness
|
||
try:
|
||
from pypdf import PdfReader, PdfWriter
|
||
|
||
reader = PdfReader(str(ai_path))
|
||
writer = PdfWriter()
|
||
for page in reader.pages:
|
||
writer.add_page(page)
|
||
with pdf_path.open("wb") as fh:
|
||
writer.write(fh)
|
||
logger.info("Converted PDF-based .ai via pypdf: %s", ai_path.name)
|
||
except Exception as exc:
|
||
logger.warning("pypdf failed (%s), falling back to direct copy", exc)
|
||
shutil.copy2(ai_path, pdf_path)
|
||
else:
|
||
# Legacy EPS-based .ai → Ghostscript
|
||
gs = shutil.which("/opt/homebrew/bin/gs") or shutil.which("gs") or shutil.which("ghostscript")
|
||
if gs is None:
|
||
raise RuntimeError(
|
||
"Cannot convert legacy .ai file: Ghostscript is not installed. "
|
||
"Run: brew install ghostscript"
|
||
)
|
||
import subprocess
|
||
|
||
result = subprocess.run(
|
||
[gs, "-dNOPAUSE", "-dBATCH", "-dSAFER",
|
||
"-sDEVICE=pdfwrite", f"-sOutputFile={pdf_path}", str(ai_path)],
|
||
capture_output=True, text=True, timeout=120,
|
||
)
|
||
if result.returncode != 0:
|
||
raise RuntimeError(
|
||
f"Ghostscript failed (exit {result.returncode}):\n{result.stderr.strip()}"
|
||
)
|
||
logger.info("Converted legacy .ai via Ghostscript: %s", ai_path.name)
|
||
|
||
return pdf_path
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# PDF → PNG rasterisation #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def _pdf_to_png(pdf_path: Path, output_dir: Path, dpi: int = 150) -> Path:
|
||
"""Rasterise the first page of a PDF to a PNG.
|
||
|
||
Tries, in order:
|
||
1. Ghostscript (if installed)
|
||
2. PyMuPDF (pip install pymupdf)
|
||
|
||
Uses a safe output filename ``page1.png`` to avoid issues with special
|
||
characters in the source PDF name.
|
||
Returns the path of the generated PNG.
|
||
"""
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
# Use a safe filename – special chars / spaces in the PDF stem can cause
|
||
# Ghostscript to silently produce no output.
|
||
png_path = output_dir / "page1.png"
|
||
|
||
# ── 1. Ghostscript ────────────────────────────────────────────────────── #
|
||
gs = (
|
||
shutil.which("/opt/homebrew/bin/gs")
|
||
or shutil.which("/usr/local/bin/gs")
|
||
or shutil.which("ghostscript")
|
||
)
|
||
if gs:
|
||
result = subprocess.run(
|
||
[
|
||
gs, "-dNOPAUSE", "-dBATCH", "-dSAFER",
|
||
"-sDEVICE=png16m", f"-r{dpi}",
|
||
"-dFirstPage=1", "-dLastPage=1",
|
||
f"-sOutputFile={png_path}", str(pdf_path),
|
||
],
|
||
capture_output=True, text=True, timeout=60,
|
||
)
|
||
if result.returncode == 0 and png_path.exists():
|
||
w, h = _png_size(png_path)
|
||
logger.info(
|
||
"Rasterised PDF → PNG via Ghostscript at %d DPI: %dx%d px (%d KB)",
|
||
dpi, w, h, png_path.stat().st_size // 1024,
|
||
)
|
||
return png_path
|
||
logger.warning("Ghostscript rasterisation failed (exit %d): %s",
|
||
result.returncode, result.stderr[:300])
|
||
|
||
# ── 2. PyMuPDF fallback ───────────────────────────────────────────────── #
|
||
try:
|
||
import fitz # PyMuPDF
|
||
|
||
doc = fitz.open(str(pdf_path))
|
||
page = doc[0]
|
||
zoom = dpi / 72.0
|
||
mat = fitz.Matrix(zoom, zoom)
|
||
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||
pix.save(str(png_path))
|
||
doc.close()
|
||
w, h = _png_size(png_path)
|
||
logger.info(
|
||
"Rasterised PDF → PNG via PyMuPDF at %d DPI: %dx%d px (%d KB)",
|
||
dpi, w, h, png_path.stat().st_size // 1024,
|
||
)
|
||
return png_path
|
||
except ImportError:
|
||
raise RuntimeError(
|
||
"Cannot rasterise PDF to PNG: neither Ghostscript nor PyMuPDF is "
|
||
"available. Run: pip install pymupdf OR brew install ghostscript"
|
||
)
|
||
except Exception as exc:
|
||
raise RuntimeError(f"Cannot rasterise PDF to PNG: {exc}") from exc
|
||
|
||
|
||
def _png_size(png_path: Path) -> tuple[int, int]:
|
||
"""Return (width, height) in pixels of a PNG file."""
|
||
from PIL import Image
|
||
with Image.open(png_path) as img:
|
||
return img.size # (width, height)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Qwen VL region crop #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def _crop_label_region(png_path: Path, output_dir: Path) -> Path:
|
||
"""Detect the main label area with Qwen VL and crop to it.
|
||
|
||
If DASHSCOPE_API_KEY is missing or detection fails, returns the original
|
||
PNG unchanged so the pipeline continues without interruption.
|
||
"""
|
||
from backend.app.region_detector import (
|
||
_get_api_key,
|
||
crop_and_save,
|
||
detect_regions,
|
||
merge_regions,
|
||
)
|
||
|
||
api_key = _get_api_key()
|
||
if not api_key:
|
||
logger.info("DASHSCOPE_API_KEY not configured – skipping AI crop, using full image")
|
||
return png_path
|
||
|
||
try:
|
||
regions, _ = detect_regions(png_path, api_key=api_key, api_max_side=1024)
|
||
except Exception as exc:
|
||
logger.warning("Qwen region detection failed (%s) – using full image", exc)
|
||
return png_path
|
||
|
||
if not regions:
|
||
logger.warning("No regions detected by Qwen – using full image")
|
||
return png_path
|
||
|
||
merged = merge_regions(regions)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
cropped_png = output_dir / "cropped_label.png"
|
||
|
||
# crop_and_save writes to numbered files; rename for predictability
|
||
results = crop_and_save(png_path, [merged], output_dir / "_tmp")
|
||
if not results:
|
||
return png_path
|
||
|
||
import shutil as _sh
|
||
_sh.move(results[0]["path"], str(cropped_png))
|
||
|
||
w, h = _png_size(cropped_png)
|
||
logger.info(
|
||
"Qwen crop: bbox=(%d,%d)-(%d,%d) → %s (%dx%d px)",
|
||
merged.x1, merged.y1, merged.x2, merged.y2,
|
||
cropped_png.name, w, h,
|
||
)
|
||
return cropped_png
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# MinerU image-block QR processing #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def _process_image_blocks(
|
||
mineru_data: dict,
|
||
source_image: Path,
|
||
output_dir: Path,
|
||
) -> list[dict]:
|
||
"""对 MinerU 解析出的每个 image 类型 block 执行二维码识别流程。
|
||
|
||
流程
|
||
----
|
||
1. 从 mineru_data 中提取所有 image block(含 bbox 坐标)。
|
||
2. 按 bbox 从 source_image(高清裁剪图)中裁出对应区域,保存为临时 PNG。
|
||
3. 调用 Qwen VL 判断裁出的图片是否为二维码/条形码。
|
||
4. 如果判断为"是",再调用 zxing 条码模块进行精确解码。
|
||
5. 返回每个 image block 的处理结果列表。
|
||
|
||
Parameters
|
||
----------
|
||
mineru_data:
|
||
MinerU 结构化 JSON(包含 pdf_info)。
|
||
source_image:
|
||
用于裁剪的高清源图(即发送给 MinerU 的那张 PNG)。
|
||
output_dir:
|
||
裁剪图临时存放目录。
|
||
|
||
Returns
|
||
-------
|
||
list of dict
|
||
每项对应一个 image block,包含:
|
||
- page, block_type, x0_pt, top_pt, x1_pt, bottom_pt
|
||
- is_qr_code : bool — 大模型语义判断结果
|
||
- barcodes : list — zxing 解码结果(is_qr_code=False 时为空列表)
|
||
- crop_path : str — 裁剪图相对路径(调试用)
|
||
"""
|
||
from PIL import Image
|
||
|
||
image_blocks = parse_mineru_image_blocks(mineru_data)
|
||
if not image_blocks:
|
||
return []
|
||
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
results: list[dict] = []
|
||
|
||
with Image.open(source_image) as src_img:
|
||
img_w, img_h = src_img.size
|
||
|
||
for idx, block in enumerate(image_blocks, start=1):
|
||
# ── 裁剪 ──────────────────────────────────────────────────────── #
|
||
x0 = max(0, int(block["x0_pt"]))
|
||
y0 = max(0, int(block["top_pt"]))
|
||
x1 = min(img_w, int(block["x1_pt"]))
|
||
y1 = min(img_h, int(block["bottom_pt"]))
|
||
|
||
if x1 <= x0 or y1 <= y0:
|
||
logger.warning(
|
||
"_process_image_blocks: block %d 边界框无效 (%d,%d)-(%d,%d),跳过",
|
||
idx, x0, y0, x1, y1,
|
||
)
|
||
results.append({**block, "is_qr_code": False, "barcodes": [], "crop_path": None})
|
||
continue
|
||
|
||
crop = src_img.crop((x0, y0, x1, y1))
|
||
crop_file = output_dir / f"block_{idx:03d}_p{block['page']}.png"
|
||
crop.save(crop_file)
|
||
logger.info(
|
||
"_process_image_blocks: block %d saved crop %s (%dx%d px)",
|
||
idx, crop_file.name, x1 - x0, y1 - y0,
|
||
)
|
||
|
||
# ── Qwen VL 语义判断 ──────────────────────────────────────────── #
|
||
qr_detected = is_qr_code(crop_file)
|
||
|
||
# ── 条码解码(仅在语义判断为二维码时执行)────────────────────── #
|
||
barcodes: list[dict] = []
|
||
if qr_detected:
|
||
logger.info(
|
||
"_process_image_blocks: block %d 被识别为二维码,启动条码解码",
|
||
idx,
|
||
)
|
||
raw_barcodes = detect_barcodes(crop_file)
|
||
barcodes = [
|
||
{
|
||
"format": b.format,
|
||
"format_label": b.format_label,
|
||
"text": b.text,
|
||
"x0": b.x0,
|
||
"y0": b.y0,
|
||
"x1": b.x1,
|
||
"y1": b.y1,
|
||
"valid": b.valid,
|
||
}
|
||
for b in raw_barcodes
|
||
]
|
||
if barcodes:
|
||
logger.info(
|
||
"_process_image_blocks: block %d 条码解码成功,共 %d 条",
|
||
idx, len(barcodes),
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"_process_image_blocks: block %d 语义判断为二维码,但 zxing 未能解码",
|
||
idx,
|
||
)
|
||
|
||
results.append(
|
||
{
|
||
**block,
|
||
"is_qr_code": qr_detected,
|
||
"barcodes": barcodes,
|
||
"crop_path": str(crop_file),
|
||
}
|
||
)
|
||
|
||
return results
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Public API #
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def process_document(
|
||
ai_path: Path,
|
||
word_path: Path,
|
||
output_dir: Path,
|
||
job_id: str,
|
||
) -> dict:
|
||
"""Full pipeline: AI → PDF → PNG → Qwen crop → MinerU → validate.
|
||
|
||
Steps
|
||
-----
|
||
1. AI / PDF file → clean PDF
|
||
2. PDF → high-res PNG (Ghostscript, 150 DPI)
|
||
3. PNG → Qwen VL detects main label area → cropped PNG
|
||
(graceful fallback to full PNG when key is absent)
|
||
4. Cropped PNG → MinerU structured-JSON extraction
|
||
5. MinerU fields → validate against Word reference document
|
||
|
||
Returns
|
||
-------
|
||
dict
|
||
``{ preview: {...}, fields: [...] }`` matching the frontend
|
||
``ProcessResponse`` type. ``preview.type`` is ``"png"`` and
|
||
``pageWidthPt`` / ``pageHeightPt`` hold the cropped image dimensions
|
||
in pixels (coord system is pixel-aligned for the PNG overlay).
|
||
"""
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# ── 1. AI → PDF ──────────────────────────────────────────────────────── #
|
||
logger.info("Step 1/5 – Converting AI to PDF: %s", ai_path.name)
|
||
pdf_path = _ai_to_pdf(ai_path, output_dir)
|
||
|
||
# ── 2. PDF → PNG ─────────────────────────────────────────────────────── #
|
||
logger.info("Step 2/5 – Rasterising PDF to PNG (150 DPI)")
|
||
png_path = _pdf_to_png(pdf_path, output_dir / "raster", dpi=150)
|
||
|
||
# ── 3. Qwen VL crop ───────────────────────────────────────────────────── #
|
||
logger.info("Step 3/5 – AI region detection & crop")
|
||
cropped_path = _crop_label_region(png_path, output_dir / "crop")
|
||
|
||
# Relative URL fragment understood by /api/files/{job_id}/{file_path}
|
||
cropped_rel = cropped_path.relative_to(output_dir).as_posix()
|
||
img_w, img_h = _png_size(cropped_path)
|
||
|
||
# ── 3b. Barcode detection ─────────────────────────────────────────────── #
|
||
logger.info("Step 3b – Scanning for barcodes / QR codes")
|
||
barcodes = detect_barcodes(cropped_path)
|
||
|
||
# Crop each barcode region for frontend display
|
||
barcode_crops_dir = output_dir / "barcode_crops"
|
||
barcode_crops_dir.mkdir(parents=True, exist_ok=True)
|
||
from PIL import Image as _PILImage # noqa: PLC0415
|
||
with _PILImage.open(cropped_path) as _src_img:
|
||
_src_w, _src_h = _src_img.size
|
||
for _bi, _b in enumerate(barcodes):
|
||
_pad = 12
|
||
_cx0 = max(0, _b.x0 - _pad)
|
||
_cy0 = max(0, _b.y0 - _pad)
|
||
_cx1 = min(_src_w, _b.x1 + _pad)
|
||
_cy1 = min(_src_h, _b.y1 + _pad)
|
||
_crop = _src_img.crop((_cx0, _cy0, _cx1, _cy1))
|
||
_crop.save(barcode_crops_dir / f"barcode_{_bi}.png")
|
||
|
||
barcode_results = [
|
||
{
|
||
"format": b.format,
|
||
"format_label": b.format_label,
|
||
"text": b.text,
|
||
"x0": b.x0,
|
||
"y0": b.y0,
|
||
"x1": b.x1,
|
||
"y1": b.y1,
|
||
"valid": b.valid,
|
||
"crop_url": f"/api/files/{job_id}/barcode_crops/barcode_{i}.png",
|
||
}
|
||
for i, b in enumerate(barcodes)
|
||
]
|
||
logger.info("Step 3b – Found %d barcode(s)", len(barcode_results))
|
||
|
||
# ── 4. MinerU parsing ────────────────────────────────────────────────── #
|
||
logger.info("Step 4/5 – Sending cropped PNG to MinerU: %s", cropped_path.name)
|
||
mineru_api_key = _get_mineru_api_key()
|
||
if not mineru_api_key:
|
||
raise RuntimeError("MINERU_API_KEY is not configured")
|
||
|
||
mineru_dir = output_dir / "mineru"
|
||
client = MineruClient(api_key=mineru_api_key)
|
||
mineru_data = client.parse_image(cropped_path, mineru_dir)
|
||
|
||
# ── 5. Parse + validate ───────────────────────────────────────────────── #
|
||
logger.info("Step 5/5 – Parsing MinerU result and validating against Word")
|
||
doc = parse_mineru_fields(mineru_data)
|
||
word_text = extract_word_text(word_path)
|
||
word_html = extract_word_html(word_path)
|
||
|
||
fields: list[dict] = []
|
||
for idx, field in enumerate(doc.fields, start=1):
|
||
validation = validate_field_against_word(field["text"], word_text)
|
||
fields.append(
|
||
{
|
||
"id": f"field-{idx}",
|
||
**field,
|
||
"normalized_text": validation.normalized_text,
|
||
"validation_status": validation.status,
|
||
"validation_reason": validation.reason,
|
||
"matched_excerpt": validation.matched_excerpt,
|
||
}
|
||
)
|
||
|
||
_STATUS_RANK = {"matched": 0, "unmatched": 1, "empty_or_garbled": 2}
|
||
fields.sort(key=lambda f: (
|
||
_STATUS_RANK.get(f["validation_status"], 9),
|
||
f["page"],
|
||
f["top_pt"],
|
||
f["x0_pt"],
|
||
))
|
||
|
||
logger.info(
|
||
"Pipeline done: job_id=%s fields=%d matched=%d unmatched=%d garbled=%d",
|
||
job_id,
|
||
len(fields),
|
||
sum(1 for f in fields if f["validation_status"] == "matched"),
|
||
sum(1 for f in fields if f["validation_status"] == "unmatched"),
|
||
sum(1 for f in fields if f["validation_status"] == "empty_or_garbled"),
|
||
)
|
||
|
||
# ── 5b. Image blocks: QR semantic check → barcode decode ─────────────── #
|
||
image_block_results = _process_image_blocks(
|
||
mineru_data=mineru_data,
|
||
source_image=cropped_path,
|
||
output_dir=output_dir / "image_blocks",
|
||
)
|
||
logger.info("Step 5b – Processed %d image block(s) from MinerU", len(image_block_results))
|
||
|
||
return {
|
||
"preview": {
|
||
# type='png': frontend renders <img> + overlay (not PDF canvas)
|
||
"type": "png",
|
||
"url": f"/api/files/{job_id}/{cropped_rel}",
|
||
# For PNG the "pt" fields carry pixel dimensions so overlay
|
||
# scale factors remain 1:1 at 100% zoom.
|
||
"pageWidthPt": img_w,
|
||
"pageHeightPt": img_h,
|
||
},
|
||
"fields": fields,
|
||
"word_text": word_text,
|
||
"word_html": word_html,
|
||
"barcodes": barcode_results,
|
||
"image_blocks": image_block_results,
|
||
}
|