743d69efd0
- Move DB operations from routes/admin.py to services/admin.py (get_logs_context, query_summary_statuses, retry_failed, delete/reset operations) - Add services/jobs.py with Job/JobEvent-based async job queue (create_job, run_job, enqueue_job) - Add services/derived.py with FTS5 reindex and paper index deletion helpers - Refactor scheduler to use job queue instead of direct pipeline calls - Add heartbeat_at/expires_at to TaskLock for lock health tracking - Remove DESIGN_REVIEW.md - Update tests: remove redundant integration tests, add unit tests for new services
237 lines
7.3 KiB
Python
237 lines
7.3 KiB
Python
"""CLI 工具 — 手动抓取论文。"""
|
||
|
||
import asyncio
|
||
import logging
|
||
|
||
import typer
|
||
from dotenv import load_dotenv
|
||
|
||
# 在导入 app 模块前加载 .env
|
||
load_dotenv()
|
||
|
||
cli_app = typer.Typer(help="HF Daily Papers 管理 CLI")
|
||
|
||
|
||
@cli_app.command()
|
||
def crawl(
|
||
date_str: str = typer.Argument(
|
||
None,
|
||
help="抓取日期 (YYYY-MM-DD),留空则自动探测",
|
||
),
|
||
top_n: int = typer.Option(None, "--top", "-n", help="取前 N 篇"),
|
||
force: bool = typer.Option(False, "--force", "-f", help="强制重抓(即使已抓取过)"),
|
||
):
|
||
"""手动抓取指定日期的 HuggingFace Daily Papers。"""
|
||
from app.config import settings
|
||
from app.database import SessionLocal, engine
|
||
from app.database import init_db as _init
|
||
from app.models import Paper
|
||
from app.services.jobs import create_job, run_job
|
||
from app.utils import today_str, yesterday_str
|
||
from sqlalchemy import func, select
|
||
|
||
target = date_str or today_str()
|
||
|
||
# 确保数据库和表存在
|
||
import os
|
||
|
||
os.makedirs(settings.db_path.parent, exist_ok=True)
|
||
_init(engine)
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
# 检查是否已抓取过(非 force 模式)
|
||
if not force and not date_str:
|
||
existing = (
|
||
db.scalar(
|
||
select(func.count(Paper.id)).where(Paper.paper_date == target)
|
||
)
|
||
or 0
|
||
)
|
||
if existing > 0:
|
||
typer.echo(
|
||
f"⏭️ {target} 已有 {existing} 篇论文,跳过(用 --force 强制重抓)"
|
||
)
|
||
return
|
||
|
||
typer.echo(f"📡 开始抓取 {target} ...")
|
||
job = create_job(
|
||
db,
|
||
"crawl_daily",
|
||
owner="cli_crawl",
|
||
payload={"target_date": target, "top_n": top_n},
|
||
)
|
||
result = asyncio.run(run_job(db, job.id))
|
||
|
||
# 未指定日期且今天失败或无数据时,自动回退到昨天
|
||
need_fallback = not date_str and (
|
||
result["status"] == "failed" or result["found"] == 0
|
||
)
|
||
if need_fallback:
|
||
fallback = yesterday_str()
|
||
existing = (
|
||
db.scalar(
|
||
select(func.count(Paper.id)).where(Paper.paper_date == fallback)
|
||
)
|
||
or 0
|
||
)
|
||
if existing > 0:
|
||
typer.echo(
|
||
f"⏭️ {fallback} 已有 {existing} 篇论文,跳过(用 --force 强制重抓)"
|
||
)
|
||
else:
|
||
typer.echo(f"🔄 {target} 无数据,尝试 {fallback} ...")
|
||
target = fallback
|
||
job = create_job(
|
||
db,
|
||
"crawl_daily",
|
||
owner="cli_crawl",
|
||
payload={"target_date": target, "top_n": top_n},
|
||
)
|
||
result = asyncio.run(run_job(db, job.id))
|
||
|
||
if result["status"] == "success":
|
||
typer.echo(
|
||
f"✅ 抓取完成:发现 {result['found']} 篇,新增 {result['new']} 篇"
|
||
)
|
||
else:
|
||
typer.echo(f"❌ 抓取失败:{result['error']}", err=True)
|
||
raise typer.Exit(code=1)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
@cli_app.command()
|
||
def summarize(
|
||
arxiv_id: str = typer.Argument(
|
||
None,
|
||
help="指定论文 arXiv ID;留空则批量处理所有 pending",
|
||
),
|
||
pdf_mode: str = typer.Option(
|
||
"auto",
|
||
"--pdf-mode",
|
||
help="PDF 传递方式:auto(自动选择)| inject(全量注入)| search(pi 自主搜索)",
|
||
),
|
||
backend: str = typer.Option(
|
||
None,
|
||
"--backend",
|
||
help="总结后端:pi | claude(留空则使用 .env 配置)",
|
||
),
|
||
):
|
||
"""手动触发 AI 总结。"""
|
||
from app.config import settings
|
||
from app.database import SessionLocal, engine
|
||
from app.database import init_db as _init
|
||
from app.services.jobs import create_job, run_job
|
||
|
||
import os
|
||
|
||
if pdf_mode not in ("auto", "inject", "search"):
|
||
typer.echo(
|
||
f"❌ 无效的 pdf_mode: {pdf_mode},只支持 auto / inject / search", err=True
|
||
)
|
||
raise typer.Exit(code=1)
|
||
|
||
if backend:
|
||
if backend not in ("pi", "claude"):
|
||
typer.echo(f"❌ 无效的 backend: {backend},只支持 pi / claude", err=True)
|
||
raise typer.Exit(code=1)
|
||
settings.SUMMARY_BACKEND = backend
|
||
|
||
os.makedirs(settings.db_path.parent, exist_ok=True)
|
||
_init(engine)
|
||
|
||
# 配置 logging 输出到终端
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-5s %(name)s | %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
|
||
from app.exceptions import ConflictError, NotFoundError
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
if arxiv_id:
|
||
typer.echo(f"🤖 开始总结 {arxiv_id} (mode={pdf_mode}) ...")
|
||
job = create_job(
|
||
db,
|
||
"summarize_one",
|
||
owner="cli_summarize",
|
||
payload={"arxiv_id": arxiv_id, "pdf_mode": pdf_mode, "force": False},
|
||
)
|
||
else:
|
||
typer.echo(f"🤖 开始批量总结 pending 论文 (mode={pdf_mode}) ...")
|
||
job = create_job(
|
||
db,
|
||
"summarize_batch",
|
||
owner="cli_summarize",
|
||
payload={"pdf_mode": pdf_mode},
|
||
)
|
||
|
||
result = asyncio.run(run_job(db, job.id))
|
||
if result.get("status") == "failed":
|
||
typer.echo(f"❌ 总结失败:{result.get('error')}", err=True)
|
||
raise typer.Exit(code=1)
|
||
typer.echo(f"✅ 总结完成:{result}")
|
||
except NotFoundError as exc:
|
||
typer.echo(f"❌ {exc.message}", err=True)
|
||
raise typer.Exit(code=1) from exc
|
||
except ConflictError as exc:
|
||
typer.echo(f"⚠️ {exc.message}", err=True)
|
||
raise typer.Exit(code=1) from exc
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
@cli_app.command()
|
||
def init_db():
|
||
"""初始化数据库表。"""
|
||
from app.config import settings
|
||
from app.database import engine
|
||
from app.database import init_db as _init
|
||
|
||
import os
|
||
|
||
os.makedirs(settings.db_path.parent, exist_ok=True)
|
||
_init(engine)
|
||
typer.echo(f"✅ 数据库已初始化:{settings.db_path}")
|
||
|
||
|
||
@cli_app.command("rebuild-derived")
|
||
def rebuild_derived(
|
||
fts: bool = typer.Option(False, "--fts", help="重建 FTS5 全文索引"),
|
||
chroma: bool = typer.Option(False, "--chroma", help="重建 ChromaDB 语义索引"),
|
||
):
|
||
"""重建可派生数据索引。"""
|
||
from app.config import settings
|
||
from app.database import SessionLocal, engine
|
||
from app.database import init_db as _init
|
||
from app.services.jobs import create_job, run_job
|
||
|
||
import os
|
||
|
||
if not fts and not chroma:
|
||
fts = True
|
||
|
||
os.makedirs(settings.db_path.parent, exist_ok=True)
|
||
_init(engine)
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
for job_type in [
|
||
*(["reindex_fts"] if fts else []),
|
||
*(["reindex_chroma"] if chroma else []),
|
||
]:
|
||
job = create_job(db, job_type, owner="cli_rebuild_derived", payload={})
|
||
result = asyncio.run(run_job(db, job.id))
|
||
typer.echo(f"{job_type}: {result}")
|
||
if result.get("status") == "failed":
|
||
raise typer.Exit(code=1)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cli_app()
|