"""CLI 工具 — 手动抓取论文。""" import asyncio import logging import typer from dotenv import load_dotenv # 在导入 app 模块前加载 .env load_dotenv() cli_app = typer.Typer(help="HF Daily Papers 管理 CLI") @cli_app.command() def crawl( date_str: str = typer.Argument( None, help="抓取日期 (YYYY-MM-DD),留空则自动探测", ), top_n: int = typer.Option(None, "--top", "-n", help="取前 N 篇"), force: bool = typer.Option(False, "--force", "-f", help="强制重抓(即使已抓取过)"), ): """手动抓取指定日期的 HuggingFace Daily Papers。""" from app.config import settings from app.database import SessionLocal, engine from app.database import init_db as _init from app.models import Paper from app.services.jobs import create_job, run_job from app.utils import today_str, yesterday_str from sqlalchemy import func, select target = date_str or today_str() # 确保数据库和表存在 import os os.makedirs(settings.db_path.parent, exist_ok=True) _init(engine) db = SessionLocal() try: # 检查是否已抓取过(非 force 模式) if not force and not date_str: existing = ( db.scalar( select(func.count(Paper.id)).where(Paper.paper_date == target) ) or 0 ) if existing > 0: typer.echo( f"⏭️ {target} 已有 {existing} 篇论文,跳过(用 --force 强制重抓)" ) return typer.echo(f"📡 开始抓取 {target} ...") job = create_job( db, "crawl_daily", owner="cli_crawl", payload={"target_date": target, "top_n": top_n}, ) result = asyncio.run(run_job(db, job.id)) # 未指定日期且今天失败或无数据时,自动回退到昨天 need_fallback = not date_str and ( result["status"] == "failed" or result["found"] == 0 ) if need_fallback: fallback = yesterday_str() existing = ( db.scalar( select(func.count(Paper.id)).where(Paper.paper_date == fallback) ) or 0 ) if existing > 0: typer.echo( f"⏭️ {fallback} 已有 {existing} 篇论文,跳过(用 --force 强制重抓)" ) else: typer.echo(f"🔄 {target} 无数据,尝试 {fallback} ...") target = fallback job = create_job( db, "crawl_daily", owner="cli_crawl", payload={"target_date": target, "top_n": top_n}, ) result = asyncio.run(run_job(db, job.id)) if result["status"] == "success": typer.echo( f"✅ 抓取完成:发现 {result['found']} 篇,新增 {result['new']} 篇" ) else: typer.echo(f"❌ 抓取失败:{result['error']}", err=True) raise typer.Exit(code=1) finally: db.close() @cli_app.command() def summarize( arxiv_id: str = typer.Argument( None, help="指定论文 arXiv ID;留空则批量处理所有 pending", ), pdf_mode: str = typer.Option( "auto", "--pdf-mode", help="PDF 传递方式:auto(自动选择)| inject(全量注入)| search(pi 自主搜索)", ), backend: str = typer.Option( None, "--backend", help="总结后端:pi | claude(留空则使用 .env 配置)", ), ): """手动触发 AI 总结。""" from app.config import settings from app.database import SessionLocal, engine from app.database import init_db as _init from app.services.jobs import create_job, run_job import os if pdf_mode not in ("auto", "inject", "search"): typer.echo( f"❌ 无效的 pdf_mode: {pdf_mode},只支持 auto / inject / search", err=True ) raise typer.Exit(code=1) if backend: if backend not in ("pi", "claude"): typer.echo(f"❌ 无效的 backend: {backend},只支持 pi / claude", err=True) raise typer.Exit(code=1) settings.SUMMARY_BACKEND = backend os.makedirs(settings.db_path.parent, exist_ok=True) _init(engine) # 配置 logging 输出到终端 logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-5s %(name)s | %(message)s", datefmt="%H:%M:%S", ) from app.exceptions import ConflictError, NotFoundError db = SessionLocal() try: if arxiv_id: typer.echo(f"🤖 开始总结 {arxiv_id} (mode={pdf_mode}) ...") job = create_job( db, "summarize_one", owner="cli_summarize", payload={"arxiv_id": arxiv_id, "pdf_mode": pdf_mode, "force": False}, ) else: typer.echo(f"🤖 开始批量总结 pending 论文 (mode={pdf_mode}) ...") job = create_job( db, "summarize_batch", owner="cli_summarize", payload={"pdf_mode": pdf_mode}, ) result = asyncio.run(run_job(db, job.id)) if result.get("status") == "failed": typer.echo(f"❌ 总结失败:{result.get('error')}", err=True) raise typer.Exit(code=1) typer.echo(f"✅ 总结完成:{result}") except NotFoundError as exc: typer.echo(f"❌ {exc.message}", err=True) raise typer.Exit(code=1) from exc except ConflictError as exc: typer.echo(f"⚠️ {exc.message}", err=True) raise typer.Exit(code=1) from exc finally: db.close() @cli_app.command() def init_db(): """初始化数据库表。""" from app.config import settings from app.database import engine from app.database import init_db as _init import os os.makedirs(settings.db_path.parent, exist_ok=True) _init(engine) typer.echo(f"✅ 数据库已初始化:{settings.db_path}") @cli_app.command("rebuild-derived") def rebuild_derived( fts: bool = typer.Option(False, "--fts", help="重建 FTS5 全文索引"), chroma: bool = typer.Option(False, "--chroma", help="重建 ChromaDB 语义索引"), ): """重建可派生数据索引。""" from app.config import settings from app.database import SessionLocal, engine from app.database import init_db as _init from app.services.jobs import create_job, run_job import os if not fts and not chroma: fts = True os.makedirs(settings.db_path.parent, exist_ok=True) _init(engine) db = SessionLocal() try: for job_type in [ *(["reindex_fts"] if fts else []), *(["reindex_chroma"] if chroma else []), ]: job = create_job(db, job_type, owner="cli_rebuild_derived", payload={}) result = asyncio.run(run_job(db, job.id)) typer.echo(f"{job_type}: {result}") if result.get("status") == "failed": raise typer.Exit(code=1) finally: db.close() if __name__ == "__main__": cli_app()