"""趋势看板路由 — 论文统计图表页面和数据 API。""" from __future__ import annotations import logging from datetime import date, timedelta from fastapi import APIRouter, Depends, Request from fastapi.templating import Jinja2Templates from sqlalchemy import func, text from sqlalchemy.orm import Session from app.config import settings from app.database import get_db logger = logging.getLogger(__name__) router = APIRouter() templates = Jinja2Templates(directory="app/templates") @router.get("/trends") def trends_page(request: Request, db: Session = Depends(get_db)): """趋势看板页面。""" stats = _get_trends_data(db) return templates.TemplateResponse( request, "trends.html", { "page_title": "趋势看板", "stats": stats, "today": _today_str(), }, ) @router.get("/api/stats/trends") def trends_api(db: Session = Depends(get_db)): """趋势数据 JSON API。""" return _get_trends_data(db) def _get_trends_data(db: Session) -> dict: """从 DB 聚合趋势数据。""" thirty_days_ago = (date.today() - timedelta(days=30)).isoformat() # 1. 按日论文数量(近 30 天) daily_rows = db.execute(text(""" SELECT paper_date, COUNT(*) as cnt FROM papers WHERE paper_date >= :start_date GROUP BY paper_date ORDER BY paper_date ASC """), {"start_date": thirty_days_ago}).fetchall() daily_counts = [ {"date": str(row[0]), "count": row[1]} for row in daily_rows ] # 2. 热门标签 Top 20 tag_rows = db.execute(text(""" SELECT tag, COUNT(*) as cnt FROM paper_tags GROUP BY tag ORDER BY cnt DESC LIMIT 20 """)).fetchall() top_tags = [ {"tag": row[0], "count": row[1]} for row in tag_rows ] # 3. Upvotes 分布 upvote_rows = db.execute(text(""" SELECT CASE WHEN upvotes >= 100 THEN '100+' WHEN upvotes >= 50 THEN '50-99' WHEN upvotes >= 20 THEN '20-49' WHEN upvotes >= 10 THEN '10-19' WHEN upvotes >= 5 THEN '5-9' ELSE '0-4' END as bucket, COUNT(*) as cnt FROM papers GROUP BY bucket ORDER BY MIN(upvotes) DESC """)).fetchall() upvotes_dist = [ {"range": row[0], "count": row[1]} for row in upvote_rows ] # 4. 总结完成率 summary_rows = db.execute(text(""" SELECT COALESCE(ss.status, 'none') as status, COUNT(*) as cnt FROM papers p LEFT JOIN summary_status ss ON ss.paper_id = p.id GROUP BY status """)).fetchall() summary_completion = [ {"status": row[0], "count": row[1]} for row in summary_rows ] return { "daily_counts": daily_counts, "top_tags": top_tags, "upvotes_dist": upvotes_dist, "summary_completion": summary_completion, } def _today_str() -> str: from datetime import datetime from zoneinfo import ZoneInfo tz = ZoneInfo(settings.APP_TIMEZONE) return datetime.now(tz).strftime("%Y-%m-%d")