Source code for paperrag.cli

"""Typer CLI for PaperRAG."""

from __future__ import annotations

import datetime
import gc
import json
import logging
import re
import sys
from pathlib import Path

import multiprocessing
import psutil
import typer
from rich.console import Console
from rich.table import Table
from tqdm import tqdm

# Force 'spawn' method for multiprocessing to avoid deadlocks with PyTorch/Docling/other libraries that use OpenMP/CUDA
try:
    multiprocessing.set_start_method("spawn", force=True)
except RuntimeError:
    pass

from paperrag.config import PaperRAGConfig, load_rc, apply_rc, PROMPT_PRESETS, PRESET_MAX_TOKENS
from paperrag import __version__

EXAMPLES_EPILOG = (
    "Examples:\n\n"
    "  paperrag                              # auto-discover index from CWD\n\n"
    "  paperrag --index-dir /path/to/index   # REPL with a specific index\n\n"
    "  paperrag index --input-dir ./papers   # index PDFs first\n\n"
    "  paperrag query \"What is attention?\"   # one-shot query\n\n"
    "  paperrag query \"What is attention?\" --no-llm   # raw retrieval results\n\n"
    "  paperrag review paper.pdf             # index + review a single PDF\n"
)

app = typer.Typer(
    name="paperrag",
    help="PaperRAG - local RAG for academic PDFs.",
    epilog=EXAMPLES_EPILOG,
    invoke_without_command=True,
    context_settings={"help_option_names": ["-h", "--help"]},
)
console = Console()

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("paperrag")

MIT_LICENSE = """
MIT License

Copyright (c) 2024 PaperRAG Team

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""


[docs] def version_callback(value: bool) -> None: """Display version and license information.""" if value: console.print(f"[bold]PaperRAG[/bold] version [cyan]{__version__}[/cyan]") console.print(MIT_LICENSE) raise typer.Exit()
def _print_gpu_info() -> None: """Detect and display GPU availability with an Ollama inference hint.""" import subprocess import platform # --- NVIDIA: use nvidia-smi (same detection path as Ollama) --- try: result = subprocess.run( ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"], capture_output=True, text=True, timeout=3, ) if result.returncode == 0: gpu_name = result.stdout.strip().splitlines()[0] console.print( f"[green]GPU detected:[/green] {gpu_name} — Ollama will use it automatically for faster inference" ) return except Exception: pass # --- Apple Silicon MPS --- if platform.system() == "Darwin": try: import torch if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): console.print( "[green]GPU detected:[/green] Apple Silicon MPS — Ollama will use it automatically" ) return except Exception: pass console.print("[dim]Running on CPU, No GPU detected[/dim]")
[docs] @app.callback(invoke_without_command=True) def entrypoint( ctx: typer.Context, version: bool = typer.Option( None, "--version", "-v", callback=version_callback, is_eager=True, help="Show version and license", ), input_dir: str = typer.Option( None, "--input-dir", "-d", help="PDF directory or single PDF file to index" ), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (will auto-discover .paperrag-index subdirectory if needed)", ), topk: int = typer.Option( None, "--top-k", "--topk", "-k", help="Number of chunks to retrieve for context (default: 3)", ), model: str = typer.Option( None, "--model", "-m", help="LLM model name (e.g., qwen3:1.7b)" ), threshold: float = typer.Option( None, "--threshold", "-t", help="Minimum similarity score threshold (0.0-1.0, default: 0.15)", ), temperature: float = typer.Option( None, "--temperature", "--temp", help="LLM temperature (0.0-2.0, default: 0.0)" ), max_tokens: int = typer.Option( None, "--max-tokens", help="LLM max output tokens (default: 256)" ), ctx_size: int = typer.Option( None, "--ctx-size", min=512, help="LLM context window size (default: 2048)" ), system_prompt: str = typer.Option( None, "--system-prompt", "--prompt", help="LLM system prompt" ), think: bool = typer.Option( False, "--think/--no-think", help="Enable thinking/reasoning mode for supported models (e.g. Qwen3)" ), ) -> None: """PaperRAG - local RAG for academic PDFs. Starts an interactive REPL session using an existing index. """ if ctx.invoked_subcommand is None: from paperrag.repl import start_repl if input_dir: console.print( "[yellow]⚠ Warning: --input-dir / -d does not auto-index PDFs when " "starting the REPL directly.\n" " It may still be used by REPL features (for example, PDF discovery " "or as the default target for /index).\n" " To index PDFs immediately run: paperrag index --input-dir <path>\n" " To start the REPL with a specific index use: paperrag --index-dir " "<path>[/yellow]" ) cfg = PaperRAGConfig() # Load .paperragrc: global first, then local overrides global_rc = load_rc(Path.home() / ".paperragrc") local_rc = load_rc(Path.cwd() / ".paperragrc") apply_rc(cfg, global_rc) apply_rc(cfg, local_rc) # Resolve effective index_dir: CLI arg takes priority over RC effective_index_dir = index_dir or cfg._index_dir # Auto-discover index from CWD if no explicit index_dir given from paperrag.vectorstore import VectorStore if not effective_index_dir: cwd = Path.cwd() for candidate in [cwd / ".paperrag-index", cwd]: if (candidate / "version.json").exists(): effective_index_dir = str(candidate) console.print(f"[dim]Using index at {candidate}[/dim]") break else: console.print( "[red]Error: no index found. Pass --index-dir or run paperrag from an indexed folder.[/red]" ) console.print( "[dim]Tip: set index-dir in ~/.paperragrc to skip this flag[/dim]" ) raise typer.Exit(1) index_path = Path(effective_index_dir).resolve() # Check if index_dir points directly to an index if not (index_path / "version.json").exists(): # Try subdirectory convention subdir_path = index_path / ".paperrag-index" if (subdir_path / "version.json").exists(): console.print(f"[dim]Found index at {subdir_path}[/dim]") index_path = subdir_path else: console.print( f"[red]No index found at {index_path} or {subdir_path}[/red]" ) raise typer.Exit(1) cfg.index_dir = str(index_path) # Load config snapshot from discovered index if VectorStore.exists(Path(cfg.index_dir)): snapshot_file = Path(cfg.index_dir) / "config_snapshot.json" if snapshot_file.exists(): try: loaded_cfg = PaperRAGConfig.load_snapshot(snapshot_file) # Apply input_dir from snapshot (but keep CLI overrides) if ( not input_dir ): # Only use snapshot if user didn't specify input_dir cfg.input_dir = loaded_cfg.input_dir console.print( f"[dim]Loaded PDF directory from index: {cfg.input_dir}[/dim]" ) except Exception as e: logger.warning("Could not load config snapshot: %s", e) # Apply input_dir CLI override if specified if input_dir: cfg.input_dir = input_dir if topk is not None: cfg.retriever.top_k = topk if model: cfg.llm.model_name = model if threshold is not None: cfg.retriever.score_threshold = threshold if temperature is not None: cfg.llm.temperature = temperature if max_tokens is not None: cfg.llm.max_tokens = max_tokens if ctx_size is not None: cfg.llm.ctx_size = ctx_size if system_prompt: cfg.llm.system_prompt = system_prompt if think: cfg.llm.think = think _print_gpu_info() start_repl(cfg)
# -- index -----------------------------------------------------------------
[docs] @app.command() def index( input_dir: str = typer.Option( None, "--input-dir", "-d", help="PDF directory or single PDF file" ), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (default: <input-dir>/.paperrag-index)", ), force: bool = typer.Option(False, "--force", "-f", help="Force full re-index"), checkpoint_interval: int = typer.Option( None, "--checkpoint-interval", "-c", help="Save index every N PDFs (0 to disable checkpointing)", ), workers: int = typer.Option( None, "--workers", "-w", help="Number of parallel workers (0 = auto-detect)" ), ocr: str = typer.Option( "auto", "--ocr", help="OCR mode: 'auto' (detect per PDF, recommended), 'always' (force), 'never' (disable)", ), manifest: str = typer.Option( None, "--manifest", help="CSV manifest file with columns: filename,title,authors,abstract,doi (speeds up indexing)", ), embed_model: str = typer.Option( None, "--embed-model", help="Sentence-Transformers model name or local path for embedding (default: sentence-transformers/all-MiniLM-L6-v2)", ), ) -> None: """Index PDF files into the FAISS vector store.""" from paperrag.chunker import chunk_paper from paperrag.embedder import Embedder from paperrag.parser import ( compute_file_hashes_parallel, discover_pdfs, parse_pdf, load_manifest, ) from paperrag.parallel import parallel_process_pdfs from paperrag.vectorstore import VectorStore cfg = PaperRAGConfig() # Load .paperragrc: global first, then local overrides global_rc = load_rc(Path.home() / ".paperragrc") local_rc = load_rc(Path.cwd() / ".paperragrc") apply_rc(cfg, global_rc) apply_rc(cfg, local_rc) if input_dir: cfg.input_dir = input_dir elif not global_rc.get("input-dir") and not local_rc.get("input-dir"): console.print("[red]Error: --input-dir (-d) is required[/red]") console.print("Usage: paperrag index --input-dir <path> [--index-dir <path>]") console.print( "[dim]Tip: set input-dir in ~/.paperragrc to skip this flag[/dim]" ) raise typer.Exit(1) if index_dir: cfg.index_dir = index_dir if checkpoint_interval is not None: cfg.indexing.checkpoint_interval = checkpoint_interval if workers is not None: cfg.indexing.n_workers = workers if embed_model: cfg.embedder.model_name = embed_model console.print(f"[cyan]Embed model: {cfg.embedder.model_name}[/cyan]") # Set OCR mode with validation ocr_lower = ocr.lower() if ocr_lower in ["auto", "always", "never"]: cfg.parser.ocr_mode = ocr_lower # type: ignore[assignment] if ocr_lower == "auto": console.print("[cyan]🔍 Adaptive OCR enabled (auto-detect per PDF)[/cyan]") elif ocr_lower == "never": console.print("[yellow]⚡ OCR disabled for all PDFs[/yellow]") elif ocr_lower == "always": console.print("[yellow]📄 OCR enabled for all PDFs[/yellow]") else: console.print( f"[red]Invalid OCR mode: {ocr}. Use 'auto', 'always', or 'never'.[/red]" ) raise typer.Exit(1) pdf_dir = Path(cfg.input_dir) idx_dir = Path(cfg.index_dir) pdfs = discover_pdfs(pdf_dir) if not pdfs: console.print("[red]No PDFs found.[/red]") raise typer.Exit(1) is_single_file = pdf_dir.is_file() if is_single_file: console.print(f"Indexing single PDF: [green]{pdf_dir.name}[/green]") else: console.print(f"Found [green]{len(pdfs)}[/green] PDF(s) in {pdf_dir}") _print_gpu_info() embedder = Embedder(cfg.embedder) # Load or create store if VectorStore.exists(idx_dir) and not force: store = VectorStore.load(idx_dir) if store.dimension != embedder.dimension: console.print("[yellow]Dimension mismatch - rebuilding index.[/yellow]") store = VectorStore(idx_dir, embedder.dimension) else: store = VectorStore(idx_dir, embedder.dimension) # Remove deleted files from the index (skip for single-file mode to avoid purging other files) stale = [] if not is_single_file: current_paths = {str(p) for p in pdfs} stale = [fp for fp in list(store.file_hashes) if fp not in current_paths] for fp in stale: store.remove_by_file(fp) del store.file_hashes[fp] if stale: console.print( f"Removed [red]{len(stale)}[/red] deleted file(s) from index." ) # Determine which files need (re)indexing - use parallel hashing if workers is None: n_workers = cfg.indexing.get_n_workers() else: n_workers = workers if workers > 0 else 1 cfg.indexing.n_workers = n_workers console.print(f"Computing hashes for {len(pdfs)} PDFs with {n_workers} workers...") pdf_hashes = compute_file_hashes_parallel(pdfs, n_workers) to_index: list[Path] = [] for pdf in pdfs: current_hash = pdf_hashes.get(str(pdf)) stored_hash = store.get_file_hash(str(pdf)) if force or stored_hash is None or stored_hash != current_hash: if stored_hash is not None: store.remove_by_file(str(pdf)) to_index.append(pdf) if not to_index and not stale: console.print("[green]Index is up-to-date. No files to re-index.[/green]") # Ensure config snapshot is saved even if index is up-to-date # This helps persist input_dir for future REPL sessions cfg.save_snapshot(idx_dir / "config_snapshot.json") raise typer.Exit(0) if not to_index: store.version += 1 store.save(config=cfg) console.print(f"[green]Done![/green] Index version: {store.version}") raise typer.Exit(0) # Process in batches of 5 (reduced from 10 to save RAM) BATCH_SIZE = 5 total_files = len(to_index) total_chunks = 0 total_successes = 0 total_failures = 0 failed_pdfs: list[tuple[str, str]] = [] # (pdf_path, error_message) console.print( f"Indexing [cyan]{total_files}[/cyan] PDF(s) in batches of {BATCH_SIZE}..." ) # Show memory and worker info for transparency try: mem = psutil.virtual_memory() console.print( f"Using [green]{n_workers}[/green] worker(s) for parallel processing " f"([dim]{mem.available / (1024**3):.1f}GB RAM available[/dim])" ) if mem.available < 4 * 1024**3: # Less than 4GB available console.print( "[yellow]⚠ Low memory detected. If indexing fails, try: --workers 2[/yellow]" ) except Exception: console.print( f"Using [green]{n_workers}[/green] worker(s) for parallel processing." ) # Helper function to log memory usage def log_memory(): if cfg.indexing.log_memory_usage: process = psutil.Process() mem_info = process.memory_info() logger.info( "Memory usage: RSS=%.1f MB, VMS=%.1f MB", mem_info.rss / 1024 / 1024, mem_info.vms / 1024 / 1024, ) for i in range(0, total_files, BATCH_SIZE): batch = to_index[i : i + BATCH_SIZE] batch_num = i // BATCH_SIZE + 1 total_batches = (total_files + BATCH_SIZE - 1) // BATCH_SIZE console.print(f"\nBatch {batch_num}/{total_batches} ({len(batch)} files)...") # Memory guard: warn if running low before processing batch try: mem = psutil.virtual_memory() if mem.available < 2 * 1024**3: # Less than 2GB available console.print( f"[yellow]⚠ Low memory warning: {mem.available / (1024**3):.1f}GB available. " "Consider reducing --workers if indexing fails.[/yellow]" ) except Exception: pass log_memory() # Parallel parse + chunk phase for this batch parsed_results = parallel_process_pdfs( batch, cfg.parser, cfg.chunker, n_workers, timeout=cfg.indexing.pdf_timeout ) # Sequential embed + add phase with comprehensive error handling batch_chunks = 0 batch_successes = 0 batch_failures = 0 for pdf_path, file_hash, chunks, error in tqdm( parsed_results, desc="Embedding", unit="file" ): # Handle parsing errors if error: logger.error("Failed to parse %s: %s", pdf_path.name, error) failed_pdfs.append((str(pdf_path), f"Parse error: {error}")) batch_failures += 1 total_failures += 1 # Check if we've exceeded max failures if ( cfg.indexing.max_failures > 0 and total_failures >= cfg.indexing.max_failures ): console.print( f"[red]Reached maximum failures ({cfg.indexing.max_failures}). Stopping.[/red]" ) break if not cfg.indexing.continue_on_error: console.print( "[red]Stopping due to error (continue_on_error=False)[/red]" ) raise typer.Exit(1) continue if not chunks: logger.warning("No chunks produced for %s", pdf_path.name) failed_pdfs.append((str(pdf_path), "No chunks produced")) batch_failures += 1 total_failures += 1 continue # Embedding phase with error handling try: texts = [c.text for c in chunks] embeddings = embedder.embed(texts) store.add(embeddings, chunks) store.set_file_hash(str(pdf_path), file_hash) batch_chunks += len(chunks) batch_successes += 1 total_successes += 1 except Exception as e: logger.error( "Failed to embed/store %s: %s", pdf_path.name, e, exc_info=True ) failed_pdfs.append((str(pdf_path), f"Embedding error: {e}")) batch_failures += 1 total_failures += 1 # Check if we've exceeded max failures if ( cfg.indexing.max_failures > 0 and total_failures >= cfg.indexing.max_failures ): console.print( f"[red]Reached maximum failures ({cfg.indexing.max_failures}). Stopping.[/red]" ) break if not cfg.indexing.continue_on_error: console.print( "[red]Stopping due to error (continue_on_error=False)[/red]" ) raise typer.Exit(1) continue total_chunks += batch_chunks # Report batch statistics console.print( f"Batch {batch_num} complete: [green]{batch_successes} succeeded[/green], " f"[red]{batch_failures} failed[/red], {batch_chunks} chunks added" ) # Save after every batch for resumption with retry logic max_retries = 3 save_success = False for retry in range(max_retries): try: store.save(config=cfg) console.print( f"✓ Checkpoint saved: [cyan]{total_successes}/{total_files}[/cyan] PDFs indexed " f"([dim]{total_chunks} total chunks[/dim])" ) save_success = True break except Exception as e: if retry < max_retries - 1: logger.warning( "Failed to save index (attempt %d/%d): %s", retry + 1, max_retries, e, ) else: logger.error( "Failed to save index after %d attempts: %s", max_retries, e ) if not save_success: console.print("[red]Critical: Failed to save index checkpoint![/red]") if not cfg.indexing.continue_on_error: raise typer.Exit(1) # Memory cleanup between batches if cfg.indexing.enable_gc_per_batch: gc.collect() log_memory() # Check if we should stop due to max failures if ( cfg.indexing.max_failures > 0 and total_failures >= cfg.indexing.max_failures ): break # Final index save store.version += 1 store.save(config=cfg) # Write failed PDFs log if failed_pdfs: failed_log_path = idx_dir / "failed_pdfs.log" with open(failed_log_path, "w") as f: f.write(f"# Failed PDFs Log - {total_failures} failures\n") f.write(f"# Generated: {datetime.datetime.now()}\n\n") for pdf_path, error_msg in failed_pdfs: f.write(f"{pdf_path}\n Error: {error_msg}\n\n") console.print(f"\n[yellow]Failed PDFs logged to: {failed_log_path}[/yellow]") # Final summary console.print("\n" + "=" * 60) console.print("[bold]Indexing Summary[/bold]") console.print("=" * 60) console.print(f"Total PDFs processed: {total_files}") console.print(f" [green]✓ Successful: {total_successes}[/green]") console.print(f" [red]✗ Failed: {total_failures}[/red]") console.print(f"Total chunks indexed: {total_chunks}\n") console.print(f"Index version: {store.version}") console.print("=" * 60)
# -- review ----------------------------------------------------------------
[docs] @app.command() def review( input_path: str = typer.Argument(..., help="PDF file or directory to review"), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (default: auto-derived from input path)", ), model: str = typer.Option( None, "--model", "-m", help="LLM model name (e.g., qwen3:1.7b)" ), topk: int = typer.Option( None, "--top-k", "--topk", "-k", help="Number of chunks to retrieve for context (default: 3)", ), threshold: float = typer.Option( None, "--threshold", "-t", help="Minimum similarity score threshold (0.0-1.0)" ), temperature: float = typer.Option( None, "--temperature", "--temp", help="LLM temperature (0.0-2.0, default: 0.0)" ), max_tokens: int = typer.Option( None, "--max-tokens", help="LLM max output tokens (default: 256)" ), ctx_size: int = typer.Option( None, "--ctx-size", min=512, help="LLM context window size (default: 2048)" ), system_prompt: str = typer.Option( None, "--system-prompt", "--prompt", help="LLM system prompt" ), preset: str = typer.Option( None, "--preset", "-p", help=f"Named prompt preset: {', '.join(PROMPT_PRESETS.keys())}", ), n_gpu_layers: int = typer.Option( None, "--n-gpu-layers", "--ngl", help="GPU layers for llama.cpp (default: 0 = CPU)" ), output: str = typer.Option( None, "--output", "-o", help="Save Q&A session to this markdown file on exit", ), think: bool = typer.Option( False, "--think/--no-think", help="Enable thinking/reasoning mode for supported models (e.g. Qwen3)" ), ) -> None: """Index a PDF file (or directory) and start an interactive review session. Convenience command for focused paper review — equivalent to running: paperrag index --input-dir <path> && paperrag --index-dir <auto> Examples: paperrag review paper.pdf paperrag review paper.pdf --preset reviewer paperrag review paper.pdf --preset reviewer --output review.md paperrag review ./papers/ --topk 5 paperrag review paper.pdf --index-dir /tmp/my-index """ from paperrag.repl import _handle_index, start_repl path_obj = Path(input_path) if not path_obj.exists(): console.print(f"[red]Error: Path does not exist: {input_path}[/red]") raise typer.Exit(1) cfg = PaperRAGConfig() # Load .paperragrc: global first, then local overrides global_rc = load_rc(Path.home() / ".paperragrc") local_rc = load_rc(Path.cwd() / ".paperragrc") apply_rc(cfg, global_rc) apply_rc(cfg, local_rc) cfg.input_dir = str(path_obj) if index_dir: cfg.index_dir = index_dir else: # Clear any RC-set index_dir so it auto-derives from input_path cfg._index_dir = None if model: cfg.llm.model_name = model if topk is not None: cfg.retriever.top_k = topk if threshold is not None: cfg.retriever.score_threshold = threshold if temperature is not None: cfg.llm.temperature = temperature if max_tokens is not None: cfg.llm.max_tokens = max_tokens if ctx_size is not None: cfg.llm.ctx_size = ctx_size if system_prompt: cfg.llm.system_prompt = system_prompt if n_gpu_layers is not None: cfg.llm.n_gpu_layers = n_gpu_layers if think: cfg.llm.think = think # Apply named preset (--system-prompt takes priority over --preset) if preset is not None: preset_lower = preset.lower() if preset_lower not in PROMPT_PRESETS: console.print(f"[red]Unknown preset '{preset}'. Valid: {', '.join(PROMPT_PRESETS)}[/red]") raise typer.Exit(1) if not system_prompt: cfg.llm.system_prompt = PROMPT_PRESETS[preset_lower] if max_tokens is None: cfg.llm.max_tokens = PRESET_MAX_TOKENS.get(preset_lower, cfg.llm.max_tokens) else: # review mode: bump max_tokens to at least 512 for richer responses if max_tokens is None: cfg.llm.max_tokens = max(cfg.llm.max_tokens, 512) # Validate that PDFs can be found before indexing from paperrag.parser import discover_pdfs pdfs = discover_pdfs(path_obj) if not pdfs: console.print(f"[red]Error: No PDFs found at {input_path}[/red]") raise typer.Exit(1) _print_gpu_info() # Step 1: Index the content _handle_index(cfg) # Step 2: Start interactive review session (auto-focus when reviewing a single PDF) auto_focus = pdfs[0] if len(pdfs) == 1 else None output_path = Path(output) if output else None start_repl(cfg, auto_focus=auto_focus, review_mode=True, output_path=output_path)
[docs] @app.command() def query( question: str = typer.Argument(..., help="Your question"), top_k: int = typer.Option(3, "--top-k", "-k"), threshold: float = typer.Option( None, "--threshold", "-t", help="Minimum similarity score threshold (0.0-1.0)" ), temperature: float = typer.Option( None, "--temperature", help="LLM temperature (0.0-2.0, default: 0.0)" ), max_tokens: int = typer.Option( None, "--max-tokens", help="LLM max output tokens (default: 256)" ), ctx_size: int = typer.Option( None, "--ctx-size", min=512, help="LLM context window size (default: 2048)" ), system_prompt: str = typer.Option( None, "--system-prompt", "--prompt", help="LLM system prompt" ), input_dir: str = typer.Option( None, "--input-dir", "-d", help="PDF directory or single PDF file" ), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (required)" ), model: str = typer.Option( None, "--model", "-m", help="LLM model name (e.g., qwen3:1.7b)" ), no_llm: bool = typer.Option( False, "--no-llm", help="Return raw retrieval results without calling the LLM" ), think: bool = typer.Option( False, "--think/--no-think", help="Enable thinking/reasoning mode for supported models (e.g. Qwen3)" ), ) -> None: """Query the indexed papers.""" from paperrag.retriever import Retriever cfg = PaperRAGConfig() # Load .paperragrc: global first, then local overrides global_rc = load_rc(Path.home() / ".paperragrc") local_rc = load_rc(Path.cwd() / ".paperragrc") apply_rc(cfg, global_rc) apply_rc(cfg, local_rc) # Resolve effective index_dir: CLI arg takes priority over RC effective_index_dir = index_dir or cfg._index_dir # Query mode requires index_dir (from CLI or .paperragrc) if not effective_index_dir: console.print("[red]Error: --index-dir is required for query command[/red]") console.print("Usage: paperrag query <question> --index-dir <path> [options]") console.print( "[dim]Tip: set index-dir in ~/.paperragrc to skip this flag[/dim]" ) raise typer.Exit(1) if input_dir: cfg.input_dir = input_dir elif effective_index_dir: # Try to load from index snapshot if input_dir not provided snapshot_path = Path(effective_index_dir) / "config_snapshot.json" if snapshot_path.exists(): try: loaded_cfg = PaperRAGConfig.load_snapshot(snapshot_path) cfg.input_dir = loaded_cfg.input_dir except Exception: pass # Fallback to default if load fails if index_dir: cfg.index_dir = index_dir if model: cfg.llm.model_name = model if threshold is not None: cfg.retriever.score_threshold = threshold if temperature is not None: cfg.llm.temperature = temperature if max_tokens is not None: cfg.llm.max_tokens = max_tokens if ctx_size is not None: cfg.llm.ctx_size = ctx_size if system_prompt: cfg.llm.system_prompt = system_prompt if think: cfg.llm.think = think try: retriever = Retriever(cfg) except FileNotFoundError as exc: console.print(f"[red]{exc}[/red]") raise typer.Exit(1) import time t0 = time.perf_counter() results = retriever.retrieve(question, top_k=top_k) t_retrieval = time.perf_counter() - t0 if not results: console.print("[yellow]No results found.[/yellow]") raise typer.Exit(0) from pathlib import Path as PathlibPath if no_llm: console.print(f"\n[bold]Retrieved Chunks[/bold] [dim]({t_retrieval:.2f}s)[/dim]") for i, result in enumerate(results, start=1): filename = PathlibPath(result.file_path).name snippet = re.sub(r"\s+", " ", result.text).strip() if len(snippet) > 200: snippet = snippet[:197].rstrip() + "..." console.print( f" [cyan][{i}][/cyan] {filename} | {result.section_name} | " f"chunk {result.chunk_id} [dim]({result.score:.2f})[/dim]" ) console.print(f" {snippet}") console.print(f"\n[dim]Retrieval only: {t_retrieval:.2f}s[/dim]\n") return import sys from paperrag.llm import stream_answer # Show retrieved sources immediately so the user sees useful info # while waiting for the LLM to generate. console.print(f"\n[bold]Sources[/bold] [dim]({t_retrieval:.2f}s)[/dim]") seen_files: dict[str, int] = {} for r in results: if r.file_path not in seen_files: seen_files[r.file_path] = len(seen_files) + 1 for file_path, label in seen_files.items(): filename = PathlibPath(file_path).name best_score = max(r.score for r in results if r.file_path == file_path) console.print( f" [cyan][{label}][/cyan] {filename} [dim]({best_score:.2f})[/dim]" ) context_chunks = [r.text for r in results] source_files = [r.file_path for r in results] try: full_answer = "" header_printed = False t1 = time.perf_counter() for chunk in stream_answer( question, context_chunks, cfg.llm, source_files=source_files ): if not header_printed: console.print("\n[bold green]Answer:[/bold green]") header_printed = True sys.stdout.write(chunk) sys.stdout.flush() full_answer += chunk sys.stdout.write("\n") sys.stdout.flush() t_llm = time.perf_counter() - t1 t_total = time.perf_counter() - t0 console.print( f"\n[dim]Retrieval: {t_retrieval:.2f}s | LLM: {t_llm:.2f}s | Total: {t_total:.2f}s[/dim]\n" ) except ImportError as exc: console.print(f"[yellow]{exc}[/yellow]") except ValueError as exc: # LLM not configured - this is fine, just skip it console.print(f"\n[dim]💡 {exc}[/dim]") except Exception as exc: from paperrag.llm import describe_llm_error error_msg, hint = describe_llm_error(exc, cfg.llm.model_name) console.print(f"[red]{error_msg}[/red]") if hint: console.print(f"[yellow]Fix: {hint}[/yellow]")
# -- evaluate --------------------------------------------------------------
[docs] @app.command() def evaluate( benchmark_file: str = typer.Argument(..., help="JSONL benchmark file"), top_k: int = typer.Option(3, "--top-k", "-k"), input_dir: str = typer.Option( None, "--input-dir", "-d", help="PDF directory or single PDF file" ), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (default: <input-dir>/.paperrag-index)", ), ) -> None: """Evaluate retrieval quality using a JSONL benchmark. Each line: {"question": "...", "relevant_documents": ["path1", ...]} """ from paperrag.retriever import Retriever cfg = PaperRAGConfig() if input_dir: cfg.input_dir = input_dir if index_dir: cfg.index_dir = index_dir try: retriever = Retriever(cfg) except FileNotFoundError as exc: console.print(f"[red]{exc}[/red]") raise typer.Exit(1) from paperrag.benchmark import evaluate as run_eval results = run_eval( benchmark_file, retriever_fn=lambda q: retriever.retrieve_file_paths(q, top_k=top_k), k=top_k, ) console.print("\n[bold]Evaluation Results[/bold]") for metric, value in results.items(): console.print(f" {metric}: {value:.4f}")
[docs] @app.command() def export( query: str = typer.Option(None, "--query", "-q", help="Question to query"), output_path: str = typer.Option(..., "--output", help="Output file path"), format: str = typer.Option( "markdown", "--format", help="Export format (markdown, csv, json)" ), top_k: int = typer.Option(3, "--top-k", "-k"), threshold: float = typer.Option( None, "--threshold", "-t", help="Minimum similarity score threshold (0.0-1.0)" ), input_dir: str = typer.Option( None, "--input-dir", "-d", help="PDF directory or single PDF file" ), index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (default: <input-dir>/.paperrag-index)", ), ) -> None: """Export query results to a file. Retrieves and saves results in the specified format. """ from paperrag.retriever import Retriever # Prompt for query if not provided if not query: query = typer.prompt("Question", type=str) if query.strip() == "": console.print("[red]Error: Question cannot be empty[/red]") raise typer.Exit(1) cfg = PaperRAGConfig() if input_dir: cfg.input_dir = input_dir if index_dir: cfg.index_dir = index_dir # Resolve effective index_dir if not cfg._index_dir: console.print("[red]Error: --index-dir is required for export command[/red]") raise typer.Exit(1) try: retriever = Retriever(cfg) except FileNotFoundError as exc: console.print(f"[red]{exc}[/red]") raise typer.Exit(1) # Get retrieval results results = retriever.retrieve( query, top_k=top_k, ) # Convert results for export from paperrag.export import export_results output_path = Path(output_path) export_results(results, output_path, format) console.print(f"[green]✓ Exported {len(results)} results to {output_path}[/green]")
# -- status ----------------------------------------------------------
[docs] @app.command() def status( index_dir: str = typer.Option( None, "--index-dir", "-i", help="Index directory (auto-discovered if not provided)", ), ) -> None: """Show index health information.""" from paperrag.parser import compute_file_hash, discover_pdfs from paperrag.vectorstore import VectorStore cfg = PaperRAGConfig() if index_dir: cfg.index_dir = index_dir idx_path = Path(cfg.index_dir).resolve() if not VectorStore.exists(idx_path): console.print("[red]No index found at specified location.[/red]") console.print("[dim]Use 'paperrag index' to create an index first.[/dim]") raise typer.Exit(1) try: store = VectorStore.load(idx_path) except Exception as e: console.print(f"[red]Error loading index: {e}[/red]") raise typer.Exit(1) # Create status table table = Table(title="✓ [bold]Index Status[/bold]") table.add_column("Metric", style="cyan", width=30) table.add_column("Value", style="green") # Vector count table.add_row("Vectors in index", str(store.index.ntotal)) table.add_row("Indexed PDFs", str(len(store.file_hashes))) table.add_row("Index version", str(store.version)) table.add_row("Embedding dimension", str(store.dimension)) # Disk size try: index_size = idx_path.stat().st_size if index_size >= 10 * 1024 * 1024: size_str = f"{index_size / (1024 * 1024):.1f} MB" elif index_size >= 1024 * 1024: size_str = f"{index_size / 1024 / 1024:.2f} MB" else: size_str = f"{index_size / 1024:.2f} KB" table.add_row("Estimated size", size_str) except Exception: table.add_row("Estimated size", "Unknown") # Timestamp try: timestamp = datetime.datetime.fromtimestamp(idx_path.stat().st_mtime).strftime( "%Y-%m-%d %H:%M:%S" ) table.add_row("Last modified", timestamp) except Exception: pass console.print("\n" + "=" * 60) console.print(table) console.print("=" * 60 + "\n") # Check for files out of sync if cfg.input_dir: pdf_dir = Path(cfg.input_dir) try: pdfs = discover_pdfs(pdf_dir) if pdfs: if not VectorStore.exists(idx_path / ".paperrag-index"): console.print( "[yellow]⚠ Index directory not found. Auto-discovery enabled.[/yellow]" ) idx_path = idx_path / ".paperrag-index" if not VectorStore.exists(idx_path): console.print( "[yellow]ℹ No index found in either location.[/yellow]" ) raise typer.Exit(0) # Detect modified/deleted files current_paths = {str(p) for p in pdfs} stored_hashes = set(store.file_hashes.keys()) modified = [] deleted = [] for pdf_path in pdfs: path_str = str(pdf_path) stored_hash = store.get_file_hash(path_str) if stored_hash is None: deleted.append(pdf_path.name) else: # Check if hash has changed by comparing against disk disk_hash = compute_file_hash(pdf_path) if stored_hash != disk_hash: modified.append(pdf_path.name) if not modified and not deleted: console.print("[green]✓ Index is fully up-to-date[/green]") else: if modified: console.print( f"[yellow]⚠ {len(modified)} file(s) have been modified[/yellow]" ) for name in modified[:5]: console.print(f" - {name}") if len(modified) > 5: console.print(f" ... and {len(modified) - 5} more") if deleted: console.print( f"[red]✗ {len(deleted)} file(s) have been deleted[/red]" ) for name in deleted[:5]: console.print(f" - {name}") if len(deleted) > 5: console.print(f" ... and {len(deleted) - 5} more") else: console.print("[yellow]ℹ No PDFs found in input directory.[/yellow]") except Exception as e: console.print(f"[red]Error checking file sync: {e}[/red]") else: console.print( "[yellow]ℹ No input directory configured. Run 'paperrag index --help' to set one.[/yellow]" )
[docs] def main() -> None: app()
if __name__ == "__main__": main()