673 lines
22 KiB
Python
673 lines
22 KiB
Python
"""管理接口 — 仪表盘、抓取、总结、清理、删除、日志,需要登录鉴权。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import hashlib
|
||
import hmac
|
||
import io
|
||
from datetime import date
|
||
|
||
from fastapi import (
|
||
APIRouter,
|
||
BackgroundTasks,
|
||
Depends,
|
||
Form,
|
||
HTTPException,
|
||
Query,
|
||
Request,
|
||
)
|
||
from fastapi.responses import RedirectResponse, Response
|
||
from pydantic import BaseModel, field_validator
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.config import settings
|
||
from app.database import get_db
|
||
from app.services import admin as admin_svc
|
||
from app.services.admin import get_admin_stats
|
||
from app.services.cleaner import cleanup_tmp
|
||
from app.services.jobs import create_job, enqueue_job
|
||
from app.utils import templates, today_str
|
||
|
||
router = APIRouter(prefix="/admin", tags=["admin"])
|
||
|
||
|
||
# ── 认证 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _check_password(password: str) -> bool:
|
||
"""校验密码,支持明文或 sha256 哈希(常量时间比较)。"""
|
||
stored = settings.ADMIN_PASSWORD
|
||
if not stored:
|
||
return False
|
||
if hmac.compare_digest(password, stored):
|
||
return True
|
||
# 也支持存 sha256 哈希
|
||
hashed = hashlib.sha256(password.encode()).hexdigest()
|
||
return hmac.compare_digest(hashed, stored)
|
||
|
||
|
||
async def verify_admin(request: Request) -> None:
|
||
"""检查 session 中的登录状态,未登录则重定向到登录页。"""
|
||
if not request.session.get("is_admin"):
|
||
raise HTTPException(status_code=303, headers={"Location": "/admin/login"})
|
||
|
||
|
||
# ── 登录 / 登出 ──────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/login")
|
||
async def admin_login_page(request: Request):
|
||
"""显示登录页面。已登录则直接跳转管理页。"""
|
||
if request.session.get("is_admin"):
|
||
return RedirectResponse("/admin/", status_code=303)
|
||
return templates.TemplateResponse(request, "login.html", {"error": None})
|
||
|
||
|
||
@router.post("/login")
|
||
async def admin_login_submit(
|
||
request: Request,
|
||
username: str = Form(""),
|
||
password: str = Form(""),
|
||
):
|
||
"""处理登录表单提交。"""
|
||
if username == settings.ADMIN_USERNAME and _check_password(password):
|
||
request.session["is_admin"] = True
|
||
return RedirectResponse("/admin/", status_code=303)
|
||
return templates.TemplateResponse(
|
||
request, "login.html", {"error": "用户名或密码错误"}
|
||
)
|
||
|
||
|
||
@router.post("/logout")
|
||
async def admin_logout(request: Request):
|
||
"""退出登录,清除 session。"""
|
||
request.session.clear()
|
||
return RedirectResponse("/admin/login", status_code=303)
|
||
|
||
|
||
# ── 仪表盘 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/")
|
||
async def admin_dashboard(
|
||
request: Request,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""管理仪表盘 — 系统状态总览。"""
|
||
stats = get_admin_stats(db)
|
||
scheduler_history = admin_svc.get_scheduler_history(db)
|
||
|
||
return templates.TemplateResponse(
|
||
request,
|
||
"admin_dashboard.html",
|
||
{"stats": stats, "scheduler_history": scheduler_history},
|
||
)
|
||
|
||
|
||
# ── 调度器 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/scheduler-status")
|
||
async def admin_scheduler_status(_admin: None = Depends(verify_admin)):
|
||
"""调度器运行状态(JSON)。"""
|
||
return admin_svc.get_scheduler_status()
|
||
|
||
|
||
@router.post("/trigger-pipeline")
|
||
async def admin_trigger_pipeline(
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""手动触发一次完整流水线(crawl → summarize → cleanup)。"""
|
||
today = today_str()
|
||
job = create_job(
|
||
db,
|
||
"pipeline_daily",
|
||
owner="admin_trigger",
|
||
payload={"target_date": today},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id, "message": "流水线任务已创建"}
|
||
|
||
|
||
@router.post("/refresh-upvotes")
|
||
async def admin_refresh_upvotes(
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
days: int | None = Query(None, description="刷新最近 N 天,默认使用配置值"),
|
||
):
|
||
"""手动刷新最近 N 天论文的 upvotes。"""
|
||
job = create_job(
|
||
db,
|
||
"refresh_upvotes",
|
||
owner="admin_refresh",
|
||
payload={"days": days},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id}
|
||
|
||
|
||
# ── 请求模型 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
class DeleteRequest(BaseModel):
|
||
date_start: date
|
||
date_end: date
|
||
include_notes: bool = True
|
||
confirm: str
|
||
|
||
@field_validator("confirm")
|
||
@classmethod
|
||
def confirm_must_be_delete(cls, v: str) -> str:
|
||
if v != "DELETE":
|
||
raise ValueError("confirm must be 'DELETE' to proceed")
|
||
return v
|
||
|
||
|
||
# ── 抓取 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/crawl")
|
||
async def admin_crawl(
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
date: str | None = Query(None, description="YYYY-MM-DD,默认今天"),
|
||
):
|
||
"""手动抓取指定日期,默认今天。"""
|
||
target_date = date or today_str()
|
||
job = create_job(
|
||
db,
|
||
"crawl_daily",
|
||
owner="admin_crawl",
|
||
payload={"target_date": target_date},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id, "target_date": target_date}
|
||
|
||
|
||
# ── 总结 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/summarize")
|
||
async def admin_summarize_batch(
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""批量总结所有 pending 论文。"""
|
||
job = create_job(
|
||
db,
|
||
"summarize_batch",
|
||
owner="admin_summarize",
|
||
payload={"pdf_mode": settings.SUMMARY_PDF_MODE},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id}
|
||
|
||
|
||
@router.post("/summarize/{arxiv_id}")
|
||
async def admin_summarize_single(
|
||
arxiv_id: str,
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""总结或重跑单篇论文。"""
|
||
job = create_job(
|
||
db,
|
||
"summarize_one",
|
||
owner="admin_summarize",
|
||
payload={
|
||
"arxiv_id": arxiv_id,
|
||
"force": True,
|
||
"pdf_mode": settings.SUMMARY_PDF_MODE,
|
||
},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id, "arxiv_id": arxiv_id}
|
||
|
||
|
||
# ── 清理 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/cleanup")
|
||
async def admin_cleanup(
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""清理 data/tmp/ 中超过 24 小时的临时文件。"""
|
||
job = create_job(db, "cleanup_tmp", owner="admin_cleanup", payload={})
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id}
|
||
|
||
|
||
@router.post("/cleanup-now")
|
||
async def admin_cleanup_now(
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""同步清理临时文件,保留给测试和本地排障使用。"""
|
||
try:
|
||
return admin_svc.run_cleanup_now(db, cleanup_tmp)
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ── 删除 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/delete")
|
||
async def admin_delete(
|
||
body: DeleteRequest,
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""删除指定日期范围内的论文(需要 confirm='DELETE' 二次确认)。"""
|
||
if body.date_start > body.date_end:
|
||
raise HTTPException(status_code=400, detail="date_start must be <= date_end")
|
||
|
||
job = create_job(
|
||
db,
|
||
"delete_range",
|
||
owner="admin_delete",
|
||
payload={
|
||
"date_start": body.date_start.isoformat(),
|
||
"date_end": body.date_end.isoformat(),
|
||
"include_notes": body.include_notes,
|
||
},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id}
|
||
|
||
|
||
@router.get("/jobs/{job_id}")
|
||
async def admin_job_detail(
|
||
job_id: int,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""查询后台任务状态和阶段事件。"""
|
||
detail = admin_svc.get_job_detail(db, job_id)
|
||
if not detail:
|
||
raise HTTPException(status_code=404, detail=f"Job not found: {job_id}")
|
||
return detail
|
||
|
||
|
||
# ── 任务监控 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/jobs")
|
||
async def admin_jobs(
|
||
request: Request,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
status: str = Query("all"),
|
||
job_type: str = Query("all"),
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(20, ge=1, le=100),
|
||
):
|
||
"""后台任务监控页。"""
|
||
jobs, total = admin_svc.query_jobs(
|
||
db, status=status, job_type=job_type, page=page, per_page=per_page
|
||
)
|
||
counts = admin_svc.get_job_status_counts(db)
|
||
|
||
def pagination_url(p: int) -> str:
|
||
params = dict(request.query_params)
|
||
params["page"] = str(p)
|
||
return "/admin/jobs?" + "&".join(f"{k}={v}" for k, v in params.items())
|
||
|
||
return templates.TemplateResponse(
|
||
request,
|
||
"admin_jobs.html",
|
||
{
|
||
"jobs": jobs,
|
||
"total": total,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"current_status": status,
|
||
"current_type": job_type,
|
||
"status_counts": counts,
|
||
"pagination_url": pagination_url,
|
||
},
|
||
)
|
||
|
||
|
||
# ── 锁管理 ────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/locks/{lock_id}/release")
|
||
async def admin_release_lock(
|
||
lock_id: int,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""强制释放一个卡死的任务锁。"""
|
||
if not admin_svc.force_release_lock(db, lock_id):
|
||
raise HTTPException(
|
||
status_code=404, detail=f"Lock not found or already released: {lock_id}"
|
||
)
|
||
return {"status": "success", "lock_id": lock_id}
|
||
|
||
|
||
# ── 重抓 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.post("/paper-recrawl/{arxiv_id}")
|
||
async def admin_paper_recrawl(
|
||
arxiv_id: str,
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""重新抓取单篇已存在论文的完整元数据。"""
|
||
job = create_job(
|
||
db, "recrawl_one", owner="admin_recrawl", payload={"arxiv_id": arxiv_id}
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {"status": "queued", "job_id": job.id, "arxiv_id": arxiv_id}
|
||
|
||
|
||
# ── 索引重建 ──────────────────────────────────────────────────────────
|
||
|
||
|
||
class RebuildIndexRequest(BaseModel):
|
||
target: str # "fts" / "chroma" / "both"
|
||
|
||
@field_validator("target")
|
||
@classmethod
|
||
def target_must_be_valid(cls, v: str) -> str:
|
||
if v not in ("fts", "chroma", "both"):
|
||
raise ValueError("target must be 'fts', 'chroma' or 'both'")
|
||
return v
|
||
|
||
|
||
@router.post("/rebuild-indexes")
|
||
async def admin_rebuild_indexes(
|
||
body: RebuildIndexRequest,
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""重建搜索索引(FTS5 / ChromaDB)。"""
|
||
job_ids: list[int] = []
|
||
if body.target in ("fts", "both"):
|
||
job = create_job(db, "reindex_fts", owner="admin_reindex", payload={})
|
||
enqueue_job(background_tasks, job.id)
|
||
job_ids.append(job.id)
|
||
if body.target in ("chroma", "both"):
|
||
job = create_job(db, "reindex_chroma", owner="admin_reindex", payload={})
|
||
enqueue_job(background_tasks, job.id)
|
||
job_ids.append(job.id)
|
||
return {"status": "queued", "job_ids": job_ids, "target": body.target}
|
||
|
||
|
||
# ── 导出 CSV ──────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/papers/export.csv")
|
||
async def admin_papers_export(
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
q: str = Query(""),
|
||
date_from: str | None = Query(None),
|
||
date_to: str | None = Query(None),
|
||
tag: str = Query(""),
|
||
summary_status: str = Query("all"),
|
||
sort: str = Query("date_desc"),
|
||
):
|
||
"""导出当前过滤条件下的论文为 CSV(含 UTF-8 BOM,Excel 友好)。"""
|
||
papers, _total, statuses = admin_svc.query_papers(
|
||
db,
|
||
q=q,
|
||
date_from=date_from,
|
||
date_to=date_to,
|
||
tag=tag,
|
||
summary_status=summary_status,
|
||
sort=sort,
|
||
page=1,
|
||
per_page=10**6,
|
||
)
|
||
|
||
buf = io.StringIO()
|
||
buf.write("") # UTF-8 BOM for Excel
|
||
writer = csv.writer(buf)
|
||
writer.writerow(
|
||
[
|
||
"arxiv_id",
|
||
"title_en",
|
||
"title_zh",
|
||
"paper_date",
|
||
"upvotes",
|
||
"summary_status",
|
||
"authors",
|
||
"tags",
|
||
"pdf_url",
|
||
]
|
||
)
|
||
for paper in papers:
|
||
authors = ";".join(a.name for a in paper.authors)
|
||
tags = ";".join(t.tag for t in paper.tags)
|
||
writer.writerow(
|
||
[
|
||
paper.arxiv_id,
|
||
paper.title_en or "",
|
||
paper.title_zh or "",
|
||
str(paper.paper_date) if paper.paper_date else "",
|
||
paper.upvotes or 0,
|
||
statuses.get(paper.arxiv_id, "none"),
|
||
authors,
|
||
tags,
|
||
paper.pdf_url or "",
|
||
]
|
||
)
|
||
|
||
filename = f"papers_{today_str().replace('-', '')}.csv"
|
||
return Response(
|
||
content=buf.getvalue(),
|
||
media_type="text/csv; charset=utf-8",
|
||
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||
)
|
||
|
||
|
||
# ── 日志 ──────────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/logs")
|
||
async def admin_logs(
|
||
request: Request,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(20, ge=1, le=100),
|
||
):
|
||
"""查看任务日志(CrawlLog + DataDeleteJob)+ 总结状态统计。"""
|
||
return templates.TemplateResponse(
|
||
request,
|
||
"admin_logs.html",
|
||
admin_svc.get_logs_context(db, page=page, per_page=per_page),
|
||
)
|
||
|
||
|
||
# ── 总结状态管理 ────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/summary-status")
|
||
async def admin_summary_status(
|
||
request: Request,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
status: str = Query("all"),
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(20, ge=1, le=100),
|
||
):
|
||
"""总结状态列表(HTMX 片段或 JSON)。"""
|
||
results, total = admin_svc.query_summary_statuses(
|
||
db, status=status, page=page, per_page=per_page
|
||
)
|
||
|
||
# 判断是否 HTMX 请求
|
||
is_htmx = request.headers.get("HX-Request") == "true"
|
||
|
||
if is_htmx:
|
||
# 返回 HTML 片段
|
||
return templates.TemplateResponse(
|
||
request,
|
||
"partials/summary_list.html",
|
||
{
|
||
"results": results,
|
||
"total": total,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"current_status": status,
|
||
},
|
||
)
|
||
|
||
return admin_svc.serialize_summary_statuses(
|
||
results, total=total, page=page, per_page=per_page
|
||
)
|
||
|
||
|
||
@router.post("/summary-retry-failed")
|
||
async def admin_summary_retry_failed(
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""重试所有失败状态的总结任务。"""
|
||
count = admin_svc.retry_failed_summaries(db)
|
||
if not count:
|
||
return {"status": "success", "message": "没有失败的任务需要重试", "count": 0}
|
||
|
||
return {
|
||
"status": "success",
|
||
"message": f"已重置 {count} 个失败任务为待总结状态",
|
||
"count": count,
|
||
}
|
||
|
||
|
||
# ── 论文管理 ────────────────────────────────────────────────────────
|
||
|
||
|
||
@router.get("/papers")
|
||
async def admin_papers(
|
||
request: Request,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
q: str = Query("", description="搜索标题/摘要"),
|
||
date_from: str | None = Query(None),
|
||
date_to: str | None = Query(None),
|
||
tag: str = Query(""),
|
||
summary_status: str = Query("all"),
|
||
sort: str = Query("date_desc"),
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(20, ge=1, le=100),
|
||
):
|
||
"""论文管理列表页面。"""
|
||
papers, total, statuses = admin_svc.query_papers(
|
||
db,
|
||
q=q,
|
||
date_from=date_from,
|
||
date_to=date_to,
|
||
tag=tag,
|
||
summary_status=summary_status,
|
||
sort=sort,
|
||
page=page,
|
||
per_page=per_page,
|
||
)
|
||
|
||
# 构建分页 URL 辅助函数
|
||
def pagination_url(p: int) -> str:
|
||
params = dict(request.query_params)
|
||
params["page"] = str(p)
|
||
return "/admin/papers?" + "&".join(f"{k}={v}" for k, v in params.items())
|
||
|
||
return templates.TemplateResponse(
|
||
request,
|
||
"admin_papers.html",
|
||
{
|
||
"papers": papers,
|
||
"paper_summary_statuses": statuses,
|
||
"total": total,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"current_status": summary_status,
|
||
"current_sort": sort,
|
||
"pagination_url": pagination_url,
|
||
},
|
||
)
|
||
|
||
|
||
@router.post("/paper-delete/{arxiv_id}")
|
||
async def admin_paper_delete(
|
||
arxiv_id: str,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""删除单篇论文。"""
|
||
if not admin_svc.delete_paper_by_arxiv(db, arxiv_id):
|
||
raise HTTPException(status_code=404, detail=f"Paper not found: {arxiv_id}")
|
||
return {"status": "success", "message": f"已删除 {arxiv_id}"}
|
||
|
||
|
||
class BatchActionRequest(BaseModel):
|
||
action: str # "delete" / "summarize" / "recrawl"
|
||
arxiv_ids: list[str]
|
||
|
||
@field_validator("action")
|
||
@classmethod
|
||
def action_must_be_valid(cls, v: str) -> str:
|
||
if v not in ("delete", "summarize", "recrawl"):
|
||
raise ValueError("action must be 'delete', 'summarize' or 'recrawl'")
|
||
return v
|
||
|
||
|
||
@router.post("/papers-batch-action")
|
||
async def admin_papers_batch_action(
|
||
body: BatchActionRequest,
|
||
background_tasks: BackgroundTasks,
|
||
_admin: None = Depends(verify_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""批量操作论文(删除 / 总结 / 重抓)。"""
|
||
if not body.arxiv_ids:
|
||
raise HTTPException(status_code=400, detail="arxiv_ids 不能为空")
|
||
|
||
if body.action == "delete":
|
||
count = admin_svc.delete_papers_by_arxiv_ids(db, body.arxiv_ids)
|
||
return {
|
||
"status": "success",
|
||
"message": f"已删除 {count} 篇论文",
|
||
"count": count,
|
||
}
|
||
|
||
elif body.action == "summarize":
|
||
count = admin_svc.reset_summaries_pending(db, body.arxiv_ids)
|
||
|
||
return {
|
||
"status": "success",
|
||
"message": f"已将 {count} 篇论文重置为待总结",
|
||
"count": count,
|
||
}
|
||
|
||
elif body.action == "recrawl":
|
||
job = create_job(
|
||
db,
|
||
"recrawl_batch",
|
||
owner="admin_recrawl",
|
||
payload={"arxiv_ids": body.arxiv_ids},
|
||
)
|
||
enqueue_job(background_tasks, job.id)
|
||
return {
|
||
"status": "queued",
|
||
"job_id": job.id,
|
||
"count": len(body.arxiv_ids),
|
||
"message": f"已将 {len(body.arxiv_ids)} 篇论文加入重抓队列",
|
||
}
|