Source code for paperrag.parallel

"""Parallel PDF processing utilities."""

from __future__ import annotations

import logging
from functools import partial
from multiprocessing import Pool
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from paperrag.chunker import Chunk
    from paperrag.config import ChunkerConfig, ParserConfig

logger = logging.getLogger(__name__)


[docs] def process_single_pdf( pdf_path: Path, parser_config: ParserConfig, chunker_config: ChunkerConfig, manifest: dict[str, dict[str, str]] | None = None ) -> tuple[Path, str | None, list[Chunk] | None, str | None]: """Process one PDF: parse + chunk (NOT embed yet). Args: pdf_path: Path to PDF file parser_config: Parser configuration chunker_config: Chunker configuration manifest: Optional manifest dict for fast metadata lookup Returns: Tuple of (pdf_path, file_hash, chunks, error_message) If successful: (pdf_path, file_hash, chunks, None) If failed: (pdf_path, None, None, error_message) """ try: from paperrag.chunker import chunk_paper from paperrag.parser import parse_pdf paper = parse_pdf(pdf_path, parser_config, manifest) chunks = chunk_paper(paper, chunker_config) return (pdf_path, paper.file_hash, chunks, None) except Exception as e: logger.error("Failed to process %s: %s", pdf_path.name, e) return (pdf_path, None, None, str(e))
[docs] def parallel_process_pdfs( pdf_paths: list[Path], parser_config: ParserConfig, chunker_config: ChunkerConfig, n_workers: int, timeout: int = 0, manifest: dict[str, dict[str, str]] | None = None ) -> list[tuple[Path, str | None, list[Chunk] | None, str | None]]: """Process PDFs in parallel, return parsed results. Args: pdf_paths: List of PDF paths to process parser_config: Parser configuration chunker_config: Chunker configuration n_workers: Number of worker processes timeout: Timeout in seconds per PDF (0 = no timeout) manifest: Optional manifest dict for fast metadata lookup Returns: List of tuples: (pdf_path, file_hash, chunks, error_message) """ if n_workers == 1: # Single-threaded mode (for debugging or compatibility) logger.info("Processing PDFs in single-threaded mode") return [ process_single_pdf(pdf, parser_config, chunker_config, manifest) for pdf in pdf_paths ] logger.info("Processing PDFs with %d workers (timeout=%ds)", n_workers, timeout) # maxtasksperchild=1 ensures workers are killed after each task, releasing memory (crucial for heavy ML models) with Pool(processes=n_workers, maxtasksperchild=1) as pool: process_fn = partial( process_single_pdf, parser_config=parser_config, chunker_config=chunker_config, manifest=manifest ) if timeout > 0: # Use map_async with timeout for individual PDFs results = [] for pdf_path in pdf_paths: try: async_result = pool.apply_async(process_fn, (pdf_path,)) result = async_result.get(timeout=timeout) results.append(result) except Exception as e: logger.error("Timeout or error processing %s: %s", pdf_path.name, e) results.append((pdf_path, None, None, f"Timeout after {timeout}s: {str(e)}")) return results else: # No timeout, use regular map results = pool.map(process_fn, pdf_paths) return results