"""Interactive REPL for PaperRAG.
REPL: Read, Evaluate, Print, Loop!
This mode is first class in PaperRAG
"""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from prompt_toolkit import HTML, PromptSession
from prompt_toolkit.completion import CompleteEvent, Completer, Completion, PathCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory
from rich.console import Console
from rich.table import Table
from paperrag import __version__
from paperrag.config import PaperRAGConfig, load_rc, PROMPT_PRESETS, PRESET_MAX_TOKENS
console = Console()
# All slash-commands available in the REPL
SLASH_COMMANDS: list[str] = [
"/index",
"/focus",
"/topk",
"/threshold",
"/temperature",
"/max-tokens",
"/ctx-size",
"/n-gpu-layers",
"/n-threads",
"/prompt",
"/preset",
"/export",
"/no-llm",
"/think",
"/model",
"/config",
"/rc",
"/help",
"/exit",
"/quit",
]
# Maximum number of user/assistant turn pairs to keep in conversation history.
# 10 turns (20 messages) balances context for follow-ups while staying within
# typical 4096-token context windows when combined with retrieval context.
_MAX_HISTORY_TURNS = 10
HELP_TEXT = """\
[bold]Available commands:[/bold]
[cyan]<any text>[/cyan] Query the indexed papers (uses top-k retrieval, with LLM unless /no-llm is active)
[cyan]/index[/cyan] Re-index the current PDF directory/file
[cyan]/index <path>[/cyan] Re-index a specific PDF file or directory
[cyan]/focus <substring>[/cyan] Focus all subsequent queries on a specific paper
[cyan]/topk <n>[/cyan] Set top-k for retrieval (default: 5)
[cyan]/threshold <n>[/cyan] Set similarity threshold 0.0-1.0 (default: 0.1)
[cyan]/temperature <n>[/cyan] Set LLM temperature 0.0-2.0 (default: 0.0)
[cyan]/max-tokens <n>[/cyan] Set LLM max output tokens (default: 1024)
[cyan]/ctx-size <n>[/cyan] Set LLM context window size (default: 4096)
[cyan]/n-gpu-layers <n>[/cyan] Set GPU layers for llama.cpp backend (0 = CPU only)
[cyan]/n-threads <n>[/cyan] Set CPU threads for llama.cpp backend (0 = auto)
[cyan]/prompt <text>[/cyan] Set LLM system prompt
[cyan]/preset <name>[/cyan] Switch to a named prompt preset: default, reviewer, summarizer, explainer
[cyan]/export[/cyan] Export this session's Q&A to a markdown file (auto-named)
[cyan]/export <path>[/cyan] Export to a specific file path
[cyan]/no-llm[/cyan] Toggle retrieval-only mode (disable LLM answers)
[cyan]/no-llm on|off[/cyan] Explicitly enable or disable retrieval-only mode
[cyan]/think[/cyan] Toggle thinking/reasoning mode (for models like Qwen3, default: off)
[cyan]/model <name>[/cyan] Switch LLM model/backend: Ollama name, local .gguf path, or HF repo (e.g. Qwen/Qwen3-1.7B-GGUF)
[cyan]/config[/cyan] Show current configuration
[cyan]/rc[/cyan] Show loaded .paperragrc files and values
[cyan]/help[/cyan] Show this help message
[cyan]/exit[/cyan] / [cyan]/quit[/cyan] Exit the REPL
[dim]Tip: type [bold]/[/bold] + Tab for autocomplete. In review mode, try [cyan]/preset reviewer[/cyan] then [cyan]/export[/cyan] to save your review.[/dim]
"""
# Commands whose argument is a filesystem path (file or directory)
_PATH_COMMANDS = {"/model", "/index", "/export"}
# Commands whose argument is one of the preset names
_PRESET_COMMAND = "/preset"
class _SlashCompleter(Completer):
"""Show slash-command and argument completions in the REPL.
- Typing '/' + Tab completes the command name.
- After '/model ', '/index ', or '/export ', Tab completes filesystem paths.
- After '/preset ', Tab completes preset names.
"""
def __init__(self) -> None:
self._path_completer = PathCompleter(expanduser=True)
def get_completions(
self, document: Document, complete_event: CompleteEvent
):
text = document.text_before_cursor
if not text.startswith("/"):
return
# --- Argument completion (after command + space) ---
if " " in text:
cmd, _, arg = text.partition(" ")
if cmd in _PATH_COMMANDS:
# Delegate to PathCompleter for the argument portion
arg_doc = Document(arg, cursor_position=len(arg))
yield from self._path_completer.get_completions(arg_doc, complete_event)
elif cmd == _PRESET_COMMAND:
for name in PROMPT_PRESETS:
if name.startswith(arg):
yield Completion(name[len(arg):], start_position=0, display=name)
return
# --- Command name completion (no space yet) ---
for cmd in SLASH_COMMANDS:
if cmd.startswith(text):
yield Completion(cmd[len(text):], start_position=0, display=cmd)
[docs]
def start_repl(
cfg: PaperRAGConfig | None = None,
*,
auto_focus: "Path | None" = None,
review_mode: bool = False,
output_path: "Path | None" = None,
) -> None:
"""Launch the interactive REPL session."""
cfg = cfg or PaperRAGConfig()
pdf_dir = Path(cfg.input_dir)
from paperrag.parser import discover_pdfs
# Discover PDFs without logging
import logging
parser_logger = logging.getLogger('paperrag.parser')
original_level = parser_logger.level
parser_logger.setLevel(logging.WARNING) # Suppress INFO logs temporarily
try:
pdfs = discover_pdfs(pdf_dir)
finally:
parser_logger.setLevel(original_level) # Always restore log level
console.print(f"\n[bold]PaperRAG[/bold] version [cyan]{__version__}[/cyan]")
# Validate and display PDF directory
# if not pdf_dir.exists():
# console.print(f"[yellow]Warning: PDF directory does not exist: {pdf_dir}[/yellow]")
# console.print("[dim]You can specify a different directory with --input-dir <path>[/dim]\n")
# else:
# console.print(f"PDF directory: {pdf_dir}")
from paperrag.vectorstore import VectorStore
idx_dir = Path(cfg.index_dir)
# Check if index exists and count indexed PDFs
loaded_store = None
if VectorStore.exists(idx_dir):
try:
loaded_store = VectorStore.load(idx_dir)
indexed_count = len(loaded_store.file_hashes)
unindexed_count = len(pdfs) - indexed_count
if unindexed_count > 0:
console.print(
f"Found [green]{len(pdfs)}[/green] PDFs - "
f"[yellow]{unindexed_count} unindexed[/yellow]"
f" [dim]— run /index to add them[/dim]"
)
else:
console.print(f"Found [green]{len(pdfs)}[/green] PDFs - [green]all indexed[/green]")
except Exception as e:
console.print(f"Found [green]{len(pdfs)}[/green] PDFs")
console.print(f"[yellow]Warning: Could not load index: {e}[/yellow]")
else:
console.print(f"[red]Error: No index found at {idx_dir}[/red]")
console.print("Run [bold]paperrag index[/bold] to create an index before using the REPL.")
import sys
sys.exit(1)
console.print(f"LLM: [cyan]{cfg.llm.model_name}[/cyan] [dim]top-k={cfg.retriever.top_k} threshold={cfg.retriever.score_threshold} — /config for all settings[/dim]")
if review_mode:
console.print(
"Switch prompts: [cyan]/preset reviewer[/cyan] (or summarizer, explainer). "
"Custom: [cyan]/prompt <text>[/cyan]. Save session: [cyan]/export[/cyan].\n"
)
else:
console.print("Type [cyan]/help[/cyan] for commands, or [cyan]/[/cyan] + Tab for autocomplete.\n")
top_k = cfg.retriever.top_k
focused_file: str | None = None
session_log: list[dict] = [] # tracks Q&A pairs for /export
conversation_history: list[dict] = [] # tracks messages for follow-up questions
use_llm = True
# Eagerly load the retriever (including embedding model) at startup
# so the first query doesn't pay the ~6s model-loading penalty.
# Pass the already-loaded store to avoid reading the index from disk twice.
console.print("[dim]Loading embedding model...[/dim]", end="")
retriever = _ensure_retriever(None, cfg, store=loaded_store)
if retriever is not None:
console.print(" [green]done[/green]")
else:
console.print(" [red]failed[/red]")
# Pre-warm LLM so first query doesn't pay the model-loading cost
from paperrag.llm import prewarm_ollama
console.print(f"[dim]Warming up LLM ({cfg.llm.model_name})...[/dim]", end="")
ok = prewarm_ollama(cfg.llm)
console.print(" [green]done[/green]" if ok else " [dim]skipped[/dim]")
# Auto-focus for single-PDF review sessions
if auto_focus is not None and retriever is not None:
all_files = sorted(list(retriever.store.file_hashes.keys()))
matches = [f for f in all_files if Path(f).name.lower() == auto_focus.name.lower()]
if matches:
focused_file = matches[0]
console.print(f"[green]Auto-focused on '{auto_focus.name}'[/green]")
other_count = len(all_files) - 1
if other_count > 0:
console.print(
f"[dim]{other_count} other paper(s) also indexed — "
f"/focus list to browse, /focus to search all[/dim]"
)
else:
console.print(
f"[yellow]Warning: '{auto_focus.name}' not found in index — "
f"searching all papers[/yellow]"
)
console.print()
# Suppress INFO logs during interactive session to keep output clean.
logging.getLogger().setLevel(logging.WARNING)
# Create prompt session with history and slash-command completion
session = PromptSession(
history=FileHistory(str(Path.home() / ".paperrag_history")),
completer=_SlashCompleter(),
complete_while_typing=False, # only complete on Tab
)
while True:
try:
if focused_file:
short_name = Path(focused_file).name
if len(short_name) > 20:
short_name = short_name[:17] + "..."
prompt_text = HTML(f"paperrag <ansigreen>({short_name})</ansigreen>> ")
else:
prompt_text = "paperrag> "
command = session.prompt(prompt_text).strip()
except (EOFError, KeyboardInterrupt):
console.print("\nBye!")
if output_path and session_log:
_export_session(session_log, output_path, focused_file=focused_file, cfg=cfg)
console.print(f"[green]Session saved to {output_path}[/green]")
break
if not command:
continue
if command in ("/exit", "/quit"):
console.print("Bye!")
if output_path and session_log:
_export_session(session_log, output_path, focused_file=focused_file, cfg=cfg)
console.print(f"[green]Session saved to {output_path}[/green]")
break
if command == "/help":
console.print(HELP_TEXT)
continue
cmd_parts = command.split(maxsplit=1)
if cmd_parts[0] == "/index":
if len(cmd_parts) == 2:
new_path = cmd_parts[1].strip()
path_obj = Path(new_path)
if not path_obj.exists():
console.print(f"[red]Path does not exist: {new_path}[/red]")
continue
cfg.input_dir = str(path_obj)
# Reset index_dir so it auto-derives from the new input path
cfg._index_dir = None
_handle_index(cfg)
retriever = None # force reload after re-index
focused_file = None # reset focus as index has changed
continue
if cmd_parts[0] == "/focus":
retriever = _ensure_retriever(retriever, cfg)
if retriever is None:
continue
# Get all unique files in index
all_files = sorted(list(retriever.store.file_hashes.keys()))
if len(cmd_parts) == 1:
focused_file = None
console.print("[green]Focus reset: searching all indexed papers.[/green]")
continue
arg = cmd_parts[1].strip().lower()
if arg == "list":
console.print(f"[bold]Indexed papers ({len(all_files)} total):[/bold]")
for f in all_files[:5]:
console.print(f" - {Path(f).name}")
if len(all_files) > 5:
console.print(f" ... and {len(all_files) - 5} others")
continue
# Substring/Pattern matching
matches = [f for f in all_files if arg in Path(f).name.lower()]
if not matches:
console.print(f"[red]No indexed papers match '{arg}'[/red]")
# Show a small sample to help the user
console.print("[dim]Available papers (sample):[/dim]")
for f in all_files[:5]:
console.print(f" - {Path(f).name}")
if len(all_files) > 5:
console.print(f" ... and {len(all_files) - 5} others. Use [cyan]/focus list[/cyan] to see more.")
elif len(matches) == 1:
focused_file = matches[0]
console.print(f"Focus set to: [green]{Path(focused_file).name}[/green]")
else:
console.print(f"[yellow]Multiple matches for '{arg}':[/yellow]")
# Show all matches if reasonable, otherwise truncate
display_matches = matches[:10]
for f in display_matches:
console.print(f" - {Path(f).name}")
if len(matches) > 10:
console.print(f" ... and {len(matches) - 10} other matches.")
console.print("[dim]Please be more specific or copy-paste a name from above.[/dim]")
continue
if cmd_parts[0] == "/topk":
if len(cmd_parts) == 2 and cmd_parts[1].isdigit():
top_k = int(cmd_parts[1])
cfg.retriever.top_k = top_k
console.print(f"top-k set to [cyan]{top_k}[/cyan]")
else:
console.print("[yellow]Usage: /topk <number>[/yellow]")
continue
if cmd_parts[0] == "/threshold":
if len(cmd_parts) == 2:
try:
threshold_val = float(cmd_parts[1])
if 0.0 <= threshold_val <= 1.0:
cfg.retriever.score_threshold = threshold_val
console.print(f"Threshold set to [cyan]{threshold_val}[/cyan]")
else:
console.print("[yellow]Threshold must be between 0.0 and 1.0[/yellow]")
except ValueError:
console.print("[yellow]Usage: /threshold <number>[/yellow]")
else:
console.print("[yellow]Usage: /threshold <number>[/yellow]")
continue
if cmd_parts[0] == "/temperature":
if len(cmd_parts) == 2:
try:
temp_val = float(cmd_parts[1])
if 0.0 <= temp_val <= 2.0:
cfg.llm.temperature = temp_val
console.print(f"Temperature set to [cyan]{temp_val}[/cyan]")
else:
console.print("[yellow]Temperature must be between 0.0 and 2.0[/yellow]")
except ValueError:
console.print("[yellow]Usage: /temperature <number>[/yellow]")
else:
console.print("[yellow]Usage: /temperature <number>[/yellow]")
continue
if cmd_parts[0] == "/max-tokens":
if len(cmd_parts) == 2 and cmd_parts[1].isdigit():
cfg.llm.max_tokens = int(cmd_parts[1])
console.print(f"Max tokens set to [cyan]{cfg.llm.max_tokens}[/cyan]")
else:
console.print("[yellow]Usage: /max-tokens <number>[/yellow]")
continue
if cmd_parts[0] == "/ctx-size":
if len(cmd_parts) == 2 and cmd_parts[1].isdigit():
val = int(cmd_parts[1])
if val >= 512:
cfg.llm.ctx_size = val
console.print(f"Context size set to [cyan]{cfg.llm.ctx_size}[/cyan]")
else:
console.print("[yellow]Context size must be at least 512[/yellow]")
else:
console.print("[yellow]Usage: /ctx-size <number>[/yellow]")
continue
if cmd_parts[0] == "/n-gpu-layers":
if len(cmd_parts) == 2 and cmd_parts[1].isdigit():
cfg.llm.n_gpu_layers = int(cmd_parts[1])
console.print(f"GPU layers set to [cyan]{cfg.llm.n_gpu_layers}[/cyan] (takes effect on next llama-server start)")
else:
console.print("[yellow]Usage: /n-gpu-layers <number>[/yellow]")
continue
if cmd_parts[0] == "/n-threads":
if len(cmd_parts) == 2 and cmd_parts[1].isdigit():
cfg.llm.n_threads = int(cmd_parts[1])
label = str(cfg.llm.n_threads) if cfg.llm.n_threads > 0 else f"auto ({os.cpu_count()})"
console.print(f"CPU threads set to [cyan]{label}[/cyan] (takes effect on next llama-server start)")
else:
console.print("[yellow]Usage: /n-threads <number> (0 = auto)[/yellow]")
continue
if cmd_parts[0] == "/prompt":
if len(cmd_parts) == 2:
cfg.llm.system_prompt = cmd_parts[1].strip()
console.print(f"System prompt set to: [dim]{cfg.llm.system_prompt}[/dim]")
else:
console.print("[yellow]Usage: /prompt <text>[/yellow]")
continue
if cmd_parts[0] == "/preset":
if len(cmd_parts) == 2:
name = cmd_parts[1].strip().lower()
if name in PROMPT_PRESETS:
cfg.llm.system_prompt = PROMPT_PRESETS[name]
cfg.llm.max_tokens = PRESET_MAX_TOKENS.get(name, cfg.llm.max_tokens)
console.print(
f"Preset [cyan]{name}[/cyan] active. "
f"[dim]max_tokens={cfg.llm.max_tokens}[/dim]"
)
console.print(f"[dim]Prompt: {cfg.llm.system_prompt[:80]}...[/dim]")
else:
valid = ", ".join(PROMPT_PRESETS.keys())
console.print(f"[yellow]Unknown preset '{name}'. Valid: {valid}[/yellow]")
else:
valid = ", ".join(PROMPT_PRESETS.keys())
console.print(f"[yellow]Usage: /preset <name> (valid: {valid})[/yellow]")
continue
if cmd_parts[0] == "/export":
if not session_log:
console.print("[yellow]No Q&A in this session yet.[/yellow]")
continue
if len(cmd_parts) == 2:
export_path = Path(cmd_parts[1].strip())
else:
import datetime as _dt
stamp = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
paper_stem = Path(focused_file).stem if focused_file else "session"
export_path = Path(f"{paper_stem}_review_{stamp}.md")
_export_session(session_log, export_path, focused_file=focused_file, cfg=cfg)
console.print(f"[green]Session exported to {export_path}[/green]")
continue
if cmd_parts[0] == "/no-llm":
if len(cmd_parts) == 2:
arg = cmd_parts[1].strip().lower()
if arg == "on":
use_llm = False
elif arg == "off":
use_llm = True
else:
console.print("[yellow]Usage: /no-llm [on|off][/yellow]")
continue
else:
use_llm = not use_llm
if use_llm:
console.print("LLM mode: [green]on[/green]")
else:
console.print("LLM mode: [yellow]off[/yellow] [dim](retrieval-only)[/dim]")
continue
if cmd_parts[0] == "/think":
cfg.llm.think = not cfg.llm.think
state = "[green]on[/green]" if cfg.llm.think else "[dim]off[/dim]"
console.print(f"Thinking mode: {state}")
continue
if cmd_parts[0] == "/model":
if len(cmd_parts) == 2:
from paperrag.llm import _is_gguf_model, _is_hf_model
raw = cmd_parts[1]
# Expand ~ for local file paths
expanded = Path(raw).expanduser()
expanded_str = str(expanded)
# If it's a directory, search for GGUF files inside it
if expanded.is_dir():
gguf_files = sorted(expanded.rglob("*.gguf"))
if not gguf_files:
console.print(f"[red]No .gguf files found in: {expanded_str}[/red]")
elif len(gguf_files) == 1:
cfg.llm.model_name = str(gguf_files[0])
console.print(f"Model: [cyan]{gguf_files[0].name}[/cyan] Backend: [yellow]llama.cpp[/yellow]")
if cfg.llm.n_gpu_layers == 0:
console.print("[dim]Tip: /n-gpu-layers -1 to offload all layers to GPU[/dim]")
else:
console.print(f"[yellow]Multiple GGUF files found — pick one:[/yellow]")
for i, f in enumerate(gguf_files, 1):
console.print(f" [cyan]{i}.[/cyan] {f}")
elif _is_gguf_model(expanded_str):
if expanded.is_file():
cfg.llm.model_name = expanded_str
console.print(f"Model: [cyan]{expanded.name}[/cyan] Backend: [yellow]llama.cpp[/yellow]")
if cfg.llm.n_gpu_layers == 0:
console.print("[dim]Tip: /n-gpu-layers -1 to offload all layers to GPU[/dim]")
else:
console.print(f"[red]GGUF file not found: {expanded_str}[/red]")
elif _is_hf_model(expanded_str):
cfg.llm.model_name = expanded_str
console.print(f"Model: [cyan]{expanded_str}[/cyan] Backend: [yellow]llama.cpp[/yellow] [dim](GGUF downloaded from HuggingFace on first query)[/dim]")
if cfg.llm.n_gpu_layers == 0:
console.print("[dim]Tip: /n-gpu-layers -1 to offload all layers to GPU[/dim]")
else:
cfg.llm.model_name = expanded_str
console.print(f"Model: [cyan]{expanded_str}[/cyan] Backend: [green]Ollama[/green]")
else:
console.print("[yellow]Usage: /model <model-name>[/yellow]")
continue
if command == "/config":
console.print("\n[bold]Current Configuration:[/bold]")
console.print("[bold]LLM:[/bold]")
console.print(f" Model: [cyan]{cfg.llm.model_name}[/cyan]")
console.print(f" Temperature: [cyan]{cfg.llm.temperature}[/cyan]")
console.print(f" Max tokens: [cyan]{cfg.llm.max_tokens}[/cyan]")
console.print(f" Context size: [cyan]{cfg.llm.ctx_size}[/cyan]")
console.print(f" GPU layers (llama.cpp): [cyan]{cfg.llm.n_gpu_layers}[/cyan]")
n_threads_label = str(cfg.llm.n_threads) if cfg.llm.n_threads > 0 else f"auto ({os.cpu_count()})"
console.print(f" CPU threads (llama.cpp): [cyan]{n_threads_label}[/cyan]")
think_label = "[green]on[/green]" if cfg.llm.think else "[dim]off[/dim]"
console.print(f" Thinking mode: {think_label}")
llm_calls_label = "[green]enabled[/green]" if use_llm else "[yellow]disabled[/yellow] [dim](retrieval-only)[/dim]"
console.print(f" LLM calls: {llm_calls_label}")
active_preset = next(
(k for k, v in PROMPT_PRESETS.items() if v == cfg.llm.system_prompt), None
)
if active_preset:
console.print(f" Active preset: [cyan]{active_preset}[/cyan]")
console.print(f" System prompt: [dim]{cfg.llm.system_prompt}[/dim]")
console.print("[bold]Retrieval:[/bold]")
console.print(f" Embed model: [cyan]{cfg.embedder.model_name}[/cyan]")
console.print(f" Top-k: [cyan]{cfg.retriever.top_k}[/cyan]")
console.print(f" Threshold: [cyan]{cfg.retriever.score_threshold}[/cyan]")
if focused_file:
console.print(f" Focus: [green]{Path(focused_file).name}[/green]\n")
else:
console.print(" Focus: [dim]none (searching all papers)[/dim]\n")
continue
if command == "/rc":
global_path = Path.home() / ".paperragrc"
local_path = Path.cwd() / ".paperragrc"
console.print("\n[bold].paperragrc files:[/bold]")
for label, rc_path in [("Global", global_path), ("Local", local_path)]:
if rc_path.is_file():
rc_data = load_rc(rc_path)
console.print(f" [green]{label}[/green]: {rc_path}")
for k, v in rc_data.items():
console.print(f" {k} = [cyan]{v}[/cyan]")
else:
console.print(f" [dim]{label}[/dim]: {rc_path} [dim](not found)[/dim]")
console.print()
continue
# Unknown slash-command: give a hint instead of treating it as a query
if command.startswith("/"):
console.print(
f"[yellow]Unknown command: {cmd_parts[0]}. "
"Type [bold]/help[/bold] to see available commands.[/yellow]"
)
continue
# Anything else is treated as a query
retriever = _ensure_retriever(retriever, cfg)
if retriever is None:
continue
entry = _handle_query(
command,
retriever,
cfg,
top_k=top_k,
focused_file=focused_file,
use_llm=use_llm,
conversation_history=conversation_history if use_llm else None,
)
if entry is not None:
session_log.append(entry)
# Update conversation history for follow-up questions
if use_llm and entry.get("answer"):
conversation_history.append({"role": "user", "content": command})
conversation_history.append({"role": "assistant", "content": entry["answer"]})
# Keep conversation history bounded to avoid exceeding context limits
if len(conversation_history) > _MAX_HISTORY_TURNS * 2:
conversation_history[:] = conversation_history[-_MAX_HISTORY_TURNS * 2:]
def _ensure_retriever(retriever, cfg: PaperRAGConfig, store=None):
"""Lazy-load the retriever, returning None on failure."""
if retriever is not None:
return retriever
try:
from paperrag.retriever import Retriever
return Retriever(cfg, store=store)
except FileNotFoundError as exc:
console.print(f"[red]{exc}[/red]")
return None
def _handle_query(
question: str,
retriever,
cfg: PaperRAGConfig,
*,
top_k: int,
focused_file: str | None = None,
use_llm: bool = True,
conversation_history: list[dict] | None = None,
) -> "dict | None":
"""Run retrieval and LLM for a user question.
Returns a session log entry dict on success, or None if no results / error.
"""
import time
t0 = time.perf_counter()
results = retriever.retrieve(question, top_k=top_k, file_path=focused_file)
t_retrieval = time.perf_counter() - t0
if not results:
# When focused on a single paper, fall back to full-document context
# instead of giving up — this mimics llama-server's behavior of having
# the whole paper in context.
if use_llm and focused_file:
all_chunks = retriever.get_all_chunks_for_file(focused_file)
if all_chunks:
console.print("[dim](No retrieval match — using full paper context)[/dim]")
results = all_chunks
t_retrieval = time.perf_counter() - t0
if not results:
# Only use conversation history for follow-ups if the last assistant
# turn is recent (i.e., the user is likely asking a follow-up about
# the same topic, not a brand new unrelated question).
if use_llm and conversation_history and len(conversation_history) >= 2:
return _handle_followup(question, cfg, conversation_history, t0)
msg = "[yellow]No results found.[/yellow]"
if cfg.retriever.score_threshold > 0.1:
msg += f" [dim](threshold={cfg.retriever.score_threshold} — try /threshold 0.1 to widen the search)[/dim]"
console.print(msg)
return None
if not use_llm:
console.print(f"\n[bold]Retrieved Chunks[/bold] [dim]({t_retrieval:.2f}s)[/dim]")
entries: list[str] = []
sources: list[str] = []
seen_source_paths: set[str] = set()
for i, result in enumerate(results, start=1):
filename = Path(result.file_path).name
if result.file_path not in seen_source_paths:
seen_source_paths.add(result.file_path)
sources.append(filename)
snippet = re.sub(r"\s+", " ", result.text).strip()
if len(snippet) > 200:
snippet = snippet[:197].rstrip() + "..."
console.print(
f" [cyan][{i}][/cyan] {filename} | {result.section_name} | "
f"chunk {result.chunk_id} [dim]({result.score:.2f})[/dim]"
)
console.print(f" {snippet}")
entries.append(
f"[{i}] {filename} | {result.section_name} | "
f"chunk {result.chunk_id} | score={result.score:.2f}\n{snippet}"
)
t_total = time.perf_counter() - t0
console.print(f"\n[dim]Retrieval only: {t_retrieval:.2f}s | Total: {t_total:.2f}s[/dim]\n")
return {
"question": question,
"answer": "\n\n".join(entries),
"sources": sources,
}
# Show retrieved sources immediately so the user sees useful info
# while waiting for the LLM to generate.
console.print(f"\n[bold]Sources[/bold] [dim]({t_retrieval:.2f}s)[/dim]")
# Deduplicate sources by file: each unique file gets one citation number.
seen_files: dict[str, int] = {}
for r in results:
if r.file_path not in seen_files:
seen_files[r.file_path] = len(seen_files) + 1
for file_path, label in seen_files.items():
filename = Path(file_path).name
best_score = max(r.score for r in results if r.file_path == file_path)
console.print(f" [cyan][{label}][/cyan] {filename} [dim]({best_score:.2f})[/dim]")
full_answer = ""
try:
import sys
from paperrag.llm import stream_answer
context_chunks = [r.text for r in results]
source_files = [r.file_path for r in results]
header_printed = False
t1 = time.perf_counter()
for chunk in stream_answer(question, context_chunks, cfg.llm, source_files=source_files, conversation_history=conversation_history):
if not header_printed:
console.print("\n[bold green]Answer:[/bold green]")
header_printed = True
sys.stdout.write(chunk)
sys.stdout.flush()
full_answer += chunk
sys.stdout.write("\n\n")
sys.stdout.flush()
t_llm = time.perf_counter() - t1
t_total = time.perf_counter() - t0
console.print(f"[dim]Retrieval: {t_retrieval:.2f}s | LLM: {t_llm:.2f}s | Total: {t_total:.2f}s[/dim]\n")
except ImportError as exc:
console.print(f"[yellow]{exc}[/yellow]")
return None
except ValueError as exc:
# LLM not configured - this is fine, just skip it
console.print(f"\n[dim]💡 {exc}[/dim]\n")
return None
except Exception as exc:
from paperrag.llm import describe_llm_error
error_msg, hint = describe_llm_error(exc, cfg.llm.model_name)
console.print(f"[red]{error_msg}[/red]")
if hint:
console.print(f"[yellow]Fix: {hint}[/yellow]")
return None
return {
"question": question,
"answer": full_answer,
"sources": [Path(fp).name for fp in seen_files],
}
def _handle_followup(
question: str,
cfg: PaperRAGConfig,
conversation_history: list[dict],
t0: float,
) -> "dict | None":
"""Handle a follow-up question using conversation history when retrieval returns no results."""
import sys
import time
from paperrag.llm import stream_followup
console.print("[dim](No new sources found — answering from conversation history)[/dim]")
full_answer = ""
try:
header_printed = False
t1 = time.perf_counter()
for chunk in stream_followup(question, conversation_history, cfg.llm):
if not header_printed:
console.print("\n[bold green]Answer:[/bold green]")
header_printed = True
sys.stdout.write(chunk)
sys.stdout.flush()
full_answer += chunk
sys.stdout.write("\n\n")
sys.stdout.flush()
t_llm = time.perf_counter() - t1
t_total = time.perf_counter() - t0
console.print(f"[dim]LLM: {t_llm:.2f}s | Total: {t_total:.2f}s[/dim]\n")
except ImportError as exc:
console.print(f"[yellow]{exc}[/yellow]")
return None
except Exception as exc:
from paperrag.llm import describe_llm_error
error_msg, hint = describe_llm_error(exc, cfg.llm.model_name)
console.print(f"[red]{error_msg}[/red]")
if hint:
console.print(f"[yellow]Fix: {hint}[/yellow]")
return None
return {
"question": question,
"answer": full_answer,
"sources": [],
}
def _handle_index(cfg: PaperRAGConfig) -> None:
"""Run the indexing pipeline from inside the REPL."""
from paperrag.chunker import chunk_paper
from paperrag.embedder import Embedder
from paperrag.parser import compute_file_hashes_parallel, discover_pdfs, parse_pdf
from paperrag.parallel import parallel_process_pdfs
from paperrag.vectorstore import VectorStore
pdf_dir = Path(cfg.input_dir)
idx_dir = Path(cfg.index_dir)
pdfs = discover_pdfs(pdf_dir)
if not pdfs:
console.print("[red]No PDFs found.[/red]")
return
is_single_file = pdf_dir.is_file()
embedder = Embedder(cfg.embedder)
if VectorStore.exists(idx_dir):
store = VectorStore.load(idx_dir)
if store.dimension != embedder.dimension:
store = VectorStore(idx_dir, embedder.dimension)
else:
store = VectorStore(idx_dir, embedder.dimension)
# Remove deleted files from the index (skip for single-file mode)
stale = []
if not is_single_file:
current_paths = {str(p) for p in pdfs}
stale = [fp for fp in list(store.file_hashes) if fp not in current_paths]
for fp in stale:
store.remove_by_file(fp)
del store.file_hashes[fp]
if stale:
console.print(f"Removed [red]{len(stale)}[/red] deleted file(s) from index.")
# Determine which files need (re)indexing - use parallel hashing
n_workers = cfg.indexing.get_n_workers()
if len(pdfs) == 1:
console.print(f"Checking [cyan]{pdfs[0].name}[/cyan] for changes...")
else:
console.print(f"Checking [cyan]{len(pdfs)}[/cyan] PDFs for changes...")
pdf_hashes = compute_file_hashes_parallel(pdfs, n_workers)
to_index: list[Path] = []
for pdf in pdfs:
current_hash = pdf_hashes.get(str(pdf))
stored_hash = store.get_file_hash(str(pdf))
if stored_hash is None or stored_hash != current_hash:
if stored_hash is not None:
store.remove_by_file(str(pdf))
to_index.append(pdf)
if not to_index and not stale:
console.print("[green]Index is up-to-date.[/green]")
return
if not to_index:
store.version += 1
store.save(config=cfg)
console.print(f"[green]Done![/green] Index version: {store.version}")
return
total = len(to_index)
console.print(f"Parsing [cyan]{total}[/cyan] PDF(s) with {n_workers} workers...")
# Parallel parse + chunk phase
parsed_results = parallel_process_pdfs(
to_index,
cfg.parser,
cfg.chunker,
n_workers,
timeout=cfg.indexing.pdf_timeout
)
# Sequential embed + add phase
console.print("Embedding and indexing chunks...")
total_chunks = 0
processed_count = 0
checkpoint_interval = cfg.indexing.checkpoint_interval
for i, (pdf_path, file_hash, chunks, error) in enumerate(parsed_results, 1):
console.print(f" [{i}/{total}] {pdf_path.name}", highlight=False)
if error:
console.print(f" [red]Error: {error}[/red]")
continue
if not chunks:
console.print(f" [yellow]No chunks produced, skipping.[/yellow]")
continue
embeddings = embedder.embed([c.text for c in chunks])
store.add(embeddings, chunks)
store.set_file_hash(str(pdf_path), file_hash)
total_chunks += len(chunks)
processed_count += 1
console.print(f" [green]{len(chunks)} chunks[/green]")
# Periodic checkpoint
if checkpoint_interval > 0 and processed_count >= checkpoint_interval:
try:
store.save(config=cfg)
console.print(f" [dim]Checkpoint saved ({processed_count} PDFs, {total_chunks} chunks)[/dim]")
processed_count = 0
except Exception as e:
console.print(f" [yellow]Checkpoint save failed: {e}[/yellow]")
store.version += 1
store.save(config=cfg)
console.print(
f"[green]Done![/green] Indexed {total_chunks} chunks from "
f"{len(to_index)} file(s). Index version: {store.version}"
)
def _export_session(
session_log: list[dict],
output_path: "Path",
*,
focused_file: "str | None" = None,
cfg: "PaperRAGConfig | None" = None,
) -> None:
"""Write the session Q&A log to a markdown file."""
import datetime as _dt
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
paper_name = Path(focused_file).name if focused_file else "Multiple papers"
preset = None
if cfg is not None:
preset = next((k for k, v in PROMPT_PRESETS.items() if v == cfg.llm.system_prompt), None)
with open(output_path, "w", encoding="utf-8") as f:
f.write(f"# PaperRAG Review Session\n\n")
f.write(f"**Paper:** {paper_name} \n")
f.write(f"**Date:** {_dt.datetime.now().strftime('%Y-%m-%d %H:%M')} \n")
if preset:
f.write(f"**Preset:** {preset} \n")
if cfg is not None:
f.write(f"**Model:** {cfg.llm.model_name} \n")
f.write("\n---\n\n")
for i, entry in enumerate(session_log, 1):
f.write(f"## Q{i}: {entry['question']}\n\n")
if entry.get("sources"):
sources_str = ", ".join(entry["sources"])
f.write(f"*Sources: {sources_str}*\n\n")
f.write(f"{entry['answer']}\n\n")
if i < len(session_log):
f.write("---\n\n")