"""PDF parsing module using Docling."""
from __future__ import annotations
import csv
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
import fitz # PyMuPDF
from paperrag.config import ParserConfig
logger = logging.getLogger(__name__)
SECTION_NAMES = [
"abstract",
"introduction",
"background",
"related work",
"method",
"methods",
"methodology",
"approach",
"experiment",
"experiments",
"results",
"result",
"discussion",
"conclusion",
"conclusions",
"references",
"acknowledgement",
"acknowledgements",
"appendix",
]
def _import_docling():
"""Import Docling lazily so non-indexing commands avoid heavy vision deps."""
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
return DocumentConverter, PdfFormatOption, InputFormat, PdfPipelineOptions
[docs]
@dataclass
class ParsedSection:
"""A single section extracted from a PDF."""
name: str
text: str
[docs]
@dataclass
class ParsedPaper:
"""Structured representation of a parsed PDF."""
file_path: str
file_hash: str
title: str
authors: str
sections: list[ParsedSection] = field(default_factory=list)
raw_text: str = ""
abstract: str = ""
doi: str = ""
[docs]
def compute_file_hash(path: Path) -> str:
"""Compute SHA256 hash of a file."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
[docs]
def has_text_layer(pdf_path: Path, min_chars: int = 100) -> bool:
"""Detect if PDF has extractable text (i.e., not a scanned image).
Args:
pdf_path: Path to PDF file
min_chars: Minimum characters on first page to consider text-based
Returns:
True if PDF has text layer, False if likely scanned/image-based
"""
try:
# Open PDF and check first page only (fast)
doc = fitz.open(pdf_path)
if len(doc) == 0:
return False
# Extract text from first page
first_page = doc[0]
text = first_page.get_text().strip()
doc.close()
# If first page has substantial text, assume text-based PDF
return len(text) >= min_chars
except Exception as e:
logger.warning("Failed to detect text layer in %s: %s", pdf_path.name, e)
# On error, assume needs OCR (safe default)
return False
[docs]
def compute_file_hashes_parallel(pdf_paths: list[Path], n_workers: int = 4) -> dict[str, str]:
"""Compute hashes for multiple PDFs in parallel.
Args:
pdf_paths: List of PDF file paths to hash
n_workers: Number of parallel worker threads
Returns:
Dictionary mapping str(pdf_path) to hash string
"""
hashes = {}
with ThreadPoolExecutor(max_workers=n_workers) as executor:
future_to_path = {
executor.submit(compute_file_hash, pdf): pdf
for pdf in pdf_paths
}
for future in as_completed(future_to_path):
pdf = future_to_path[future]
try:
hashes[str(pdf)] = future.result()
except Exception as e:
logger.error("Failed to hash %s: %s", pdf.name, e)
return hashes
def _normalise_heading(heading: str) -> str | None:
"""Return normalised section name if it matches a known heading, else None."""
lower = heading.strip().lower()
for sec in SECTION_NAMES:
if sec in lower:
return sec.title()
return None
[docs]
def load_manifest(manifest_path: Path) -> dict[str, dict[str, str]]:
"""Load CSV manifest with metadata to skip parsing.
Expected columns: filename, title, authors, abstract (optional), doi (optional)
Returns dict: {filename: {title, authors, abstract, doi}}
"""
manifest: dict[str, dict[str, str]] = {}
try:
with open(manifest_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
filename = row.get('filename', '').strip()
if not filename:
continue
manifest[filename] = {
'title': row.get('title', 'Unknown').strip(),
'authors': row.get('authors', 'Unknown').strip(),
'abstract': row.get('abstract', '').strip(),
'doi': row.get('doi', '').strip(),
}
logger.info("Loaded manifest with %d entries from %s", len(manifest), manifest_path)
except Exception as e:
logger.warning("Failed to load manifest %s: %s", manifest_path, e)
return manifest
def _extract_title_from_doc(doc) -> str:
"""Best-effort title extraction from a Docling document."""
if hasattr(doc, "title") and doc.title:
return str(doc.title).strip()
md = doc.export_to_markdown()
# Skip common non-title patterns
skip_patterns = [
"<!---",
"--->",
"<!--",
"-->",
"image",
"figure",
]
for line in md.splitlines():
stripped = line.strip()
# Skip empty or artifact lines
if not stripped or any(pat in stripped.lower() for pat in skip_patterns):
continue
# Prefer lines with "#" heading markers
if stripped.startswith("# "):
title = stripped.lstrip("# ").strip()
# Skip if it looks like metadata (has comma-separated names)
if "," not in title or len(title) > 100:
return title[:200]
# If no heading found, take first substantial line
# Skip if it looks like author list (comma-separated, all caps/mixed case)
if len(stripped) > 10 and not stripped.startswith("#"):
# Heuristic: if line has multiple commas, likely authors, skip it
comma_count = stripped.count(",")
if comma_count == 0 or (comma_count <= 2 and len(stripped) > 50):
return stripped[:200]
return "Unknown"
def _extract_authors_from_doc(doc) -> str:
"""Best-effort author extraction from Docling document."""
if hasattr(doc, "authors") and doc.authors:
if isinstance(doc.authors, list):
return ", ".join(str(a) for a in doc.authors)
return str(doc.authors).strip()
# Try to extract from markdown
md = doc.export_to_markdown()
for line in md.splitlines()[:10]: # Check first 10 lines
stripped = line.strip()
# Look for lines with multiple commas (author lists)
if stripped and "," in stripped and not stripped.startswith("#"):
# Heuristic: if 2+ commas and no year pattern, likely authors
if stripped.count(",") >= 2 and not any(c.isdigit() for c in stripped[:20]):
return stripped[:300]
return "Unknown"
def _extract_abstract_from_sections(sections: list[ParsedSection]) -> str:
"""Extract abstract section if present."""
for sec in sections:
if sec.name.lower() == "abstract":
return sec.text[:1000] # Limit to 1000 chars
return ""
[docs]
def parse_pdf(path: Path, config: ParserConfig | None = None, manifest: dict[str, dict[str, str]] | None = None) -> ParsedPaper:
"""Parse a single PDF using Docling and return structured output.
Args:
path: Path to PDF file
config: Parser configuration
manifest: Optional manifest dict for fast metadata lookup
"""
DocumentConverter, PdfFormatOption, InputFormat, PdfPipelineOptions = _import_docling()
config = config or ParserConfig()
file_hash = compute_file_hash(path)
# Check manifest for pre-extracted metadata
manifest_entry = None
if manifest:
manifest_entry = manifest.get(path.name)
if manifest_entry:
logger.info("Using manifest metadata for %s", path.name)
# Determine if OCR should be used
should_use_ocr = False
ocr_status = ""
if config.ocr_mode == "always":
should_use_ocr = True
ocr_status = " [OCR: forced]"
elif config.ocr_mode == "never":
should_use_ocr = False
ocr_status = " [OCR: disabled]"
else: # auto mode
has_text = has_text_layer(path)
should_use_ocr = not has_text # Only use OCR if no text found
if has_text:
ocr_status = " [OCR: skipped, text detected]"
else:
ocr_status = " [OCR: enabled, scanned PDF]"
logger.info("Parsing %s (hash=%s)%s", path.name, file_hash[:12], ocr_status)
try:
if not should_use_ocr:
# Text-based PDF: disable OCR for speed
pipeline_options = PdfPipelineOptions(
do_ocr=False,
do_table_structure=False,
)
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options
)
}
)
else:
# Scanned PDF: enable full OCR processing
converter = DocumentConverter()
result = converter.convert(str(path))
doc = result.document
except Exception as exc:
logger.error("Failed to parse %s: %s", path.name, exc)
# Use manifest metadata if available even on error
if manifest_entry:
return ParsedPaper(
file_path=str(path),
file_hash=file_hash,
title=manifest_entry.get('title', path.stem),
authors=manifest_entry.get('authors', ''),
raw_text="",
abstract=manifest_entry.get('abstract', ''),
doi=manifest_entry.get('doi', ''),
)
return ParsedPaper(
file_path=str(path),
file_hash=file_hash,
title=path.stem,
authors="",
raw_text="",
abstract="",
doi="",
)
# Use manifest metadata if available, otherwise extract
if manifest_entry:
title = manifest_entry.get('title', 'Unknown')
authors = manifest_entry.get('authors', 'Unknown')
abstract = manifest_entry.get('abstract', '')
doi = manifest_entry.get('doi', '')
else:
title = _extract_title_from_doc(doc)
authors = _extract_authors_from_doc(doc)
abstract = "" # Will extract from sections
doi = ""
md_text = doc.export_to_markdown()
sections: list[ParsedSection] = []
current_heading: str | None = None
current_lines: list[str] = []
for line in md_text.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
heading_text = stripped.lstrip("#").strip()
normalised = _normalise_heading(heading_text)
if normalised:
if current_heading and current_lines:
sections.append(
ParsedSection(
name=current_heading,
text="\n".join(current_lines).strip(),
)
)
current_heading = normalised
current_lines = []
continue
current_lines.append(line)
if current_heading and current_lines:
sections.append(
ParsedSection(
name=current_heading,
text="\n".join(current_lines).strip(),
)
)
raw_text = md_text
if not sections and config.fallback_to_raw and raw_text.strip():
logger.warning("No sections extracted from %s — using raw text fallback", path.name)
sections = [ParsedSection(name="Full Text", text=raw_text.strip())]
# Extract abstract from sections if not from manifest
if not abstract and sections:
abstract = _extract_abstract_from_sections(sections)
return ParsedPaper(
file_path=str(path),
file_hash=file_hash,
title=title,
authors=authors,
sections=sections,
raw_text=raw_text,
abstract=abstract,
doi=doi,
)
[docs]
def discover_pdfs(input_dir: Path) -> list[Path]:
"""Recursively find all PDF files under *input_dir*, or return a single PDF file."""
if not input_dir.exists():
logger.error("Input path does not exist: %s", input_dir)
return []
if input_dir.is_file():
if input_dir.suffix.lower() == ".pdf":
logger.info("Single PDF file: %s", input_dir)
return [input_dir]
else:
logger.error("Not a PDF file: %s", input_dir)
return []
pdfs = sorted(input_dir.rglob("*.pdf"))
logger.info("Discovered %d PDF(s) in %s", len(pdfs), input_dir)
return pdfs