feat: add search and user data routes, services, and tests

This commit is contained in:
2026-06-05 22:53:27 +08:00
parent 29e6797c12
commit 1538d564f6
14 changed files with 1633 additions and 13 deletions
+4
View File
@@ -11,6 +11,8 @@ from app.database import engine
from app.models import init_db from app.models import init_db
from app.routes.admin import router as admin_router from app.routes.admin import router as admin_router
from app.routes.pages import router as pages_router from app.routes.pages import router as pages_router
from app.routes.search import router as search_router
from app.routes.user import router as user_router
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG if settings.APP_DEBUG else logging.INFO, level=logging.DEBUG if settings.APP_DEBUG else logging.INFO,
@@ -43,6 +45,8 @@ def create_app() -> FastAPI:
# 路由 # 路由
app.include_router(pages_router) app.include_router(pages_router)
app.include_router(admin_router) app.include_router(admin_router)
app.include_router(search_router)
app.include_router(user_router)
return app return app
+249
View File
@@ -0,0 +1,249 @@
"""搜索、阅读列表、RSS Feed 路由。"""
from __future__ import annotations
import math
from datetime import date, datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from xml.sax.saxutils import escape
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import Response
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from sqlalchemy.orm import Session, joinedload
from app.config import settings
from app.database import get_db
from app.models import Paper, PaperTag, UserReadingStatus
from app.services.searcher import get_all_tags, search_papers
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
# ── 搜索页 ────────────────────────────────────────────────────────────
@router.get("/search")
def search_page(
request: Request,
q: str = Query(default=""),
tag: str = Query(default=""),
sort: str = Query(default="relevance"),
page: int = Query(default=1, ge=1),
db: Session = Depends(get_db),
):
"""搜索页面。"""
result = search_papers(db, query=q or None, tag=tag or None, sort=sort, page=page)
all_tags = get_all_tags(db)
return templates.TemplateResponse(
request,
"search.html",
{
"query": q,
"tag": tag,
"sort": sort,
"results": result["results"],
"snippets": result["snippets"],
"total": result["total"],
"page": result["page"],
"total_pages": result["total_pages"],
"all_tags": all_tags,
"page_title": f"搜索: {q}" if q else "搜索",
"today": _today_str(),
},
)
# ── 搜索 JSON API ─────────────────────────────────────────────────────
@router.get("/api/search")
def search_api(
q: str = Query(default=""),
tag: str = Query(default=""),
sort: str = Query(default="relevance"),
page: int = Query(default=1, ge=1),
db: Session = Depends(get_db),
):
"""搜索 JSON API。"""
result = search_papers(db, query=q or None, tag=tag or None, sort=sort, page=page)
items = []
for paper in result["results"]:
snippet = result["snippets"].get(paper.id, {})
items.append(
{
"arxiv_id": paper.arxiv_id,
"title_en": paper.title_en,
"title_zh": paper.title_zh,
"paper_date": paper.paper_date.isoformat() if paper.paper_date else None,
"upvotes": paper.upvotes,
"tags": [t.tag for t in paper.tags],
"authors": [a.name for a in paper.authors],
"snippet_title_zh": snippet.get("title_zh"),
"snippet_abstract": snippet.get("abstract"),
}
)
return {
"results": items,
"total": result["total"],
"page": result["page"],
"total_pages": result["total_pages"],
}
# ── 阅读列表 ──────────────────────────────────────────────────────────
@router.get("/reading-list")
def reading_list_page(
request: Request,
filter: str = Query(default="all"),
tag: str = Query(default=""),
db: Session = Depends(get_db),
):
"""阅读列表页面。"""
papers = _query_reading_list(db, filter, tag or None)
all_tags = get_all_tags(db)
return templates.TemplateResponse(
request,
"reading_list.html",
{
"papers": papers,
"current_filter": filter,
"current_tag": tag,
"all_tags": all_tags,
"page_title": "阅读列表",
"today": _today_str(),
},
)
def _query_reading_list(
db: Session,
filter_type: str,
tag: str | None,
) -> list[Paper]:
"""根据筛选条件查询阅读列表。"""
from sqlalchemy import or_
# 基础:有任意用户数据的论文
base = db.query(Paper).filter(
or_(
Paper.bookmark.has(),
Paper.reading_status.has(),
Paper.note.has(),
)
)
# 应用筛选
if filter_type == "has_note":
base = base.filter(Paper.note.has())
elif filter_type in ("unread", "skimmed", "read_summary", "read_full"):
base = base.filter(
Paper.reading_status.has(UserReadingStatus.status == filter_type)
)
# 应用标签
if tag:
base = base.filter(Paper.tags.any(PaperTag.tag == tag))
return (
base.options(
joinedload(Paper.authors),
joinedload(Paper.tags),
joinedload(Paper.summary_status),
joinedload(Paper.bookmark),
joinedload(Paper.reading_status),
joinedload(Paper.note),
)
.order_by(Paper.paper_date.desc(), Paper.upvotes.desc())
.all()
)
# ── RSS Feed ──────────────────────────────────────────────────────────
@router.get("/rss.xml")
def rss_feed(
tag: str = Query(default=""),
db: Session = Depends(get_db),
):
"""RSS 2.0 Feed — 最近 7 天论文。"""
seven_days_ago = date.today() - timedelta(days=7)
query = (
db.query(Paper)
.filter(Paper.paper_date >= seven_days_ago)
.options(
joinedload(Paper.authors),
joinedload(Paper.tags),
joinedload(Paper.summary),
)
.order_by(Paper.paper_date.desc(), Paper.upvotes.desc())
)
if tag:
query = query.filter(Paper.tags.any(PaperTag.tag == tag))
papers = query.all()
xml = _generate_rss_xml(papers, settings.BASE_URL, tag or None)
return Response(content=xml, media_type="application/xml")
def _generate_rss_xml(papers: list[Paper], base_url: str, tag: str | None) -> str:
"""生成 RSS 2.0 XML。"""
lines = ['<?xml version="1.0" encoding="UTF-8"?>']
lines.append('<rss version="2.0">')
lines.append(" <channel>")
channel_title = "HF Daily Papers"
if tag:
channel_title += f"{tag}"
lines.append(f" <title>{escape(channel_title)}</title>")
lines.append(f" <link>{escape(base_url)}</link>")
lines.append(" <description>HuggingFace Daily Papers — 中文论文导览站</description>")
lines.append(f" <language>zh-CN</language>")
for paper in papers:
title_text = paper.title_zh or paper.title_en
link = f"{base_url}/paper/{paper.arxiv_id}"
desc = ""
if paper.summary and paper.summary.one_line:
desc = paper.summary.one_line
elif paper.abstract:
desc = paper.abstract[:500]
pub_date = ""
if paper.paper_date:
# RFC 822 格式
pub_date = paper.paper_date.strftime("%a, %d %b %Y 00:00:00 +0800")
lines.append(" <item>")
lines.append(f" <title>{escape(title_text)}</title>")
lines.append(f" <link>{escape(link)}</link>")
lines.append(f" <description>{escape(desc)}</description>")
if pub_date:
lines.append(f" <pubDate>{pub_date}</pubDate>")
lines.append(f" <guid>{escape(link)}</guid>")
lines.append(" </item>")
lines.append(" </channel>")
lines.append("</rss>")
return "\n".join(lines)
# ── 工具函数 ──────────────────────────────────────────────────────────
def _today_str() -> str:
"""当前日期字符串(按 APP_TIMEZONE)。"""
tz = ZoneInfo(settings.APP_TIMEZONE)
return datetime.now(tz).strftime("%Y-%m-%d")
+103
View File
@@ -0,0 +1,103 @@
"""用户数据 JSON API — 收藏、阅读状态、笔记。"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.services.user_data import (
get_note,
save_note,
set_reading_status,
toggle_bookmark,
)
router = APIRouter(prefix="/api", tags=["user-data"])
# ── 请求模型 ──────────────────────────────────────────────────────────
class ReadingStatusRequest(BaseModel):
status: str
class NoteRequest(BaseModel):
content: str
# ── 收藏 ──────────────────────────────────────────────────────────────
@router.post("/bookmark/{arxiv_id}")
def bookmark_toggle(arxiv_id: str, request: Request, db: Session = Depends(get_db)):
"""切换收藏状态。支持 HTMX 局部刷新和 JSON 响应。"""
result = toggle_bookmark(db, arxiv_id)
if "error" in result:
raise HTTPException(status_code=404, detail=result["error"])
# HTMX 请求 → 返回 HTML 片段
if request.headers.get("HX-Request"):
star = "" if result["bookmarked"] else ""
active_class = " active" if result["bookmarked"] else ""
html = (
f'<button class="btn-bookmark{active_class}" '
f'hx-post="/api/bookmark/{arxiv_id}" '
f'hx-target="#user-data-{arxiv_id}" '
f'hx-swap="outerHTML">'
f"{star}</button>"
)
return HTMLResponse(content=html)
return result
# ── 阅读状态 ──────────────────────────────────────────────────────────
@router.post("/reading-status/{arxiv_id}")
def reading_status_update(
arxiv_id: str,
body: ReadingStatusRequest,
db: Session = Depends(get_db),
):
"""更新阅读状态。"""
result = set_reading_status(db, arxiv_id, body.status)
if "error" in result:
if result["error"] == "not_found":
raise HTTPException(status_code=404, detail="Paper not found")
elif result["error"] == "invalid_status":
raise HTTPException(
status_code=422,
detail=f"Invalid status. Valid: {result['valid']}",
)
return result
# ── 笔记 ──────────────────────────────────────────────────────────────
@router.get("/note/{arxiv_id}")
def note_get(arxiv_id: str, db: Session = Depends(get_db)):
"""获取笔记。"""
result = get_note(db, arxiv_id)
if result is None:
raise HTTPException(status_code=404, detail="Paper not found")
return result
@router.post("/note/{arxiv_id}")
def note_save(arxiv_id: str, body: NoteRequest, db: Session = Depends(get_db)):
"""保存笔记。"""
result = save_note(db, arxiv_id, body.content)
if "error" in result:
raise HTTPException(status_code=404, detail=result["error"])
return result
+230
View File
@@ -0,0 +1,230 @@
"""FTS5 全文搜索服务 — 关键词 + 标签筛选,命中片段高亮,分页。"""
from __future__ import annotations
import math
import re
from sqlalchemy import text
from sqlalchemy.orm import Session, joinedload
from app.models import Paper
# ── 输入清洗 ──────────────────────────────────────────────────────────
# FTS5 查询语法中的特殊字符,用户输入时需要移除
_FTS5_SPECIAL = re.compile(r'["{}()^+:]')
def _sanitize_query(raw: str) -> str:
"""清洗用户输入,生成安全的 FTS5 MATCH 表达式。
- 移除 FTS5 特殊字符
- 按空白拆分为 token,用 AND 连接
- 空字符串返回 None
"""
cleaned = _FTS5_SPECIAL.sub("", raw.strip())
tokens = cleaned.split()
if not tokens:
return None
return " AND ".join(tokens)
# ── 核心搜索 ──────────────────────────────────────────────────────────
def search_papers(
db: Session,
*,
query: str | None = None,
tag: str | None = None,
sort: str = "relevance",
page: int = 1,
page_size: int = 20,
) -> dict:
"""FTS5 搜索论文。
返回::
{
"results": list[Paper],
"snippets": dict[int, dict], # paper_id → {title_zh, abstract}
"total": int,
"page": int,
"total_pages": int,
}
"""
match_expr = _sanitize_query(query) if query else None
# ── 无关键词 + 无标签 → 空结果 ──
if not match_expr and not tag:
return {
"results": [],
"snippets": {},
"total": 0,
"page": page,
"total_pages": 0,
}
# ── 构建条件性 JOIN 和 WHERE 片段 ──
tag_join = ""
tag_where = ""
tag_params: dict = {}
if tag:
tag_join = "JOIN paper_tags pt ON pt.paper_id = p.id"
tag_where = "AND pt.tag = :tag"
tag_params["tag"] = tag
offset = (page - 1) * page_size
if match_expr:
return _search_with_fts(
db, match_expr, tag_join, tag_where, tag_params,
sort, page, page_size, offset,
)
else:
return _search_tag_only(
db, tag, sort, page, page_size, offset,
)
def _search_with_fts(
db: Session,
match_expr: str,
tag_join: str,
tag_where: str,
tag_params: dict,
sort: str,
page: int,
page_size: int,
offset: int,
) -> dict:
"""有关键词时的 FTS5 MATCH 搜索。"""
params = {"query": match_expr, "limit": page_size, "offset": offset}
params.update(tag_params)
order = "bm25(papers_fts)" if sort == "relevance" else "p.paper_date DESC, p.upvotes DESC"
# ── 主查询:取 ID + rank + snippet ──
rows_sql = text(f"""
SELECT
p.id,
papers_fts.rank,
snippet(papers_fts, 1, '<mark>', '</mark>', '...', 32) AS snippet_title_zh,
snippet(papers_fts, 2, '<mark>', '</mark>', '...', 32) AS snippet_abstract
FROM papers_fts
JOIN papers p ON p.id = papers_fts.rowid
{tag_join}
WHERE papers_fts MATCH :query
{tag_where}
ORDER BY {order}
LIMIT :limit OFFSET :offset
""")
fts_rows = db.execute(rows_sql, params).fetchall()
# ── 计数查询 ──
count_sql = text(f"""
SELECT COUNT(DISTINCT papers_fts.rowid)
FROM papers_fts
JOIN papers p ON p.id = papers_fts.rowid
{tag_join}
WHERE papers_fts MATCH :query
{tag_where}
""")
total = db.execute(count_sql, params).scalar() or 0
paper_ids = [row[0] for row in fts_rows]
snippets = {
row[0]: {"title_zh": row[2], "abstract": row[3]}
for row in fts_rows
}
papers = _load_papers_by_ids(db, paper_ids, sort, {row[0]: row[1] for row in fts_rows})
return {
"results": papers,
"snippets": snippets,
"total": total,
"page": page,
"total_pages": math.ceil(total / page_size) if total else 0,
}
def _search_tag_only(
db: Session,
tag: str,
sort: str,
page: int,
page_size: int,
offset: int,
) -> dict:
"""只有标签筛选,无关键词。"""
order = "p.paper_date DESC, p.upvotes DESC" if sort == "date" else "p.paper_date DESC, p.upvotes DESC"
rows_sql = text(f"""
SELECT p.id
FROM papers p
JOIN paper_tags pt ON pt.paper_id = p.id
WHERE pt.tag = :tag
ORDER BY {order}
LIMIT :limit OFFSET :offset
""")
rows = db.execute(rows_sql, {"tag": tag, "limit": page_size, "offset": offset}).fetchall()
count_sql = text("""
SELECT COUNT(DISTINCT p.id)
FROM papers p
JOIN paper_tags pt ON pt.paper_id = p.id
WHERE pt.tag = :tag
""")
total = db.execute(count_sql, {"tag": tag}).scalar() or 0
paper_ids = [row[0] for row in rows]
papers = _load_papers_by_ids(db, paper_ids)
return {
"results": papers,
"snippets": {},
"total": total,
"page": page,
"total_pages": math.ceil(total / page_size) if total else 0,
}
def _load_papers_by_ids(
db: Session,
paper_ids: list[int],
sort: str | None = None,
rank_map: dict[int, float] | None = None,
) -> list[Paper]:
"""根据 ID 列表加载完整 ORM 对象,保持原始排序。"""
if not paper_ids:
return []
papers = (
db.query(Paper)
.filter(Paper.id.in_(paper_ids))
.options(
joinedload(Paper.authors),
joinedload(Paper.tags),
joinedload(Paper.summary_status),
joinedload(Paper.bookmark),
joinedload(Paper.reading_status),
)
.all()
)
# 按 FTS rank / tag-only 原始顺序排列
id_order = {pid: idx for idx, pid in enumerate(paper_ids)}
papers.sort(key=lambda p: id_order.get(p.id, 0))
return papers
# ── 辅助查询 ──────────────────────────────────────────────────────────
def get_all_tags(db: Session) -> list[str]:
"""返回所有不重复的标签,按字母排序。"""
rows = db.execute(
text("SELECT DISTINCT tag FROM paper_tags ORDER BY tag")
).fetchall()
return [row[0] for row in rows]
+115
View File
@@ -0,0 +1,115 @@
"""用户数据服务 — 收藏、阅读状态、个人笔记。无账号体系,数据写入本地 SQLite。"""
from __future__ import annotations
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from app.models import Paper, UserBookmark, UserNote, UserReadingStatus
# ── 收藏 ──────────────────────────────────────────────────────────────
def toggle_bookmark(db: Session, arxiv_id: str) -> dict:
"""切换收藏状态。返回 {"bookmarked": bool, "arxiv_id": str}。"""
paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first()
if not paper:
return {"error": "not_found"}
existing = db.query(UserBookmark).filter(UserBookmark.paper_id == paper.id).first()
if existing:
db.delete(existing)
db.commit()
return {"bookmarked": False, "arxiv_id": arxiv_id}
else:
bookmark = UserBookmark(
paper_id=paper.id,
created_at=datetime.now(timezone.utc),
)
db.add(bookmark)
db.commit()
return {"bookmarked": True, "arxiv_id": arxiv_id}
# ── 阅读状态 ──────────────────────────────────────────────────────────
VALID_STATUSES = {"unread", "skimmed", "read_summary", "read_full"}
def set_reading_status(db: Session, arxiv_id: str, status: str) -> dict:
"""设置阅读状态。status 必须是 unread/skimmed/read_summary/read_full。"""
if status not in VALID_STATUSES:
return {"error": "invalid_status", "valid": sorted(VALID_STATUSES)}
paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first()
if not paper:
return {"error": "not_found"}
now = datetime.now(timezone.utc)
existing = (
db.query(UserReadingStatus)
.filter(UserReadingStatus.paper_id == paper.id)
.first()
)
if existing:
existing.status = status
existing.updated_at = now
else:
db.add(
UserReadingStatus(
paper_id=paper.id,
status=status,
updated_at=now,
)
)
db.commit()
return {"arxiv_id": arxiv_id, "status": status}
# ── 笔记 ──────────────────────────────────────────────────────────────
def get_note(db: Session, arxiv_id: str) -> dict | None:
"""获取笔记。返回 {"arxiv_id", "content", "updated_at"} 或 None(论文不存在时)。"""
paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first()
if not paper:
return None
note = db.query(UserNote).filter(UserNote.paper_id == paper.id).first()
if not note:
return {"arxiv_id": arxiv_id, "content": "", "updated_at": None}
return {
"arxiv_id": arxiv_id,
"content": note.content,
"updated_at": note.updated_at.isoformat() if note.updated_at else None,
}
def save_note(db: Session, arxiv_id: str, content: str) -> dict:
"""创建或更新笔记。返回 {"arxiv_id", "content", "updated_at"}。"""
paper = db.query(Paper).filter(Paper.arxiv_id == arxiv_id).first()
if not paper:
return {"error": "not_found"}
now = datetime.now(timezone.utc)
existing = db.query(UserNote).filter(UserNote.paper_id == paper.id).first()
if existing:
existing.content = content
existing.updated_at = now
else:
db.add(
UserNote(
paper_id=paper.id,
content=content,
created_at=now,
updated_at=now,
)
)
db.commit()
return {
"arxiv_id": arxiv_id,
"content": content,
"updated_at": now.isoformat(),
}
+220
View File
@@ -326,13 +326,233 @@ a:hover { color: var(--accent-hover); text-decoration: underline; }
border-top: 1px solid var(--border); border-top: 1px solid var(--border);
} }
/* ── Responsive ─────────────────────────────────────────────────── */
/* ── Nav Search ──────────────────────────────────────────────────── */
.nav-search {
display: flex;
align-items: center;
margin-right: auto;
}
.nav-search-input {
padding: 5px 12px;
border: 1px solid var(--border);
border-radius: var(--radius);
font-size: 0.85rem;
background: var(--bg);
color: var(--ink);
width: 180px;
transition: border-color 0.2s;
font-family: var(--font-sans);
}
.nav-search-input:focus {
outline: none;
border-color: var(--accent);
}
/* ── Search Page ────────────────────────────────────────────────── */
.search-form {
display: flex;
gap: 8px;
margin-bottom: 20px;
}
.search-input {
flex: 1;
padding: 10px 16px;
border: 1px solid var(--border);
border-radius: var(--radius);
font-size: 1rem;
font-family: var(--font-sans);
background: var(--surface);
color: var(--ink);
}
.search-input:focus {
outline: none;
border-color: var(--accent);
box-shadow: 0 0 0 3px rgba(45, 95, 138, 0.1);
}
.search-btn {
padding: 10px 24px;
background: var(--accent);
color: #fff;
border: none;
border-radius: var(--radius);
font-size: 0.9rem;
font-weight: 500;
cursor: pointer;
transition: background 0.2s;
}
.search-btn:hover { background: var(--accent-hover); }
/* ── Tag Filter ─────────────────────────────────────────────────── */
.tag-filter {
margin-bottom: 16px;
display: flex;
align-items: center;
gap: 6px;
flex-wrap: wrap;
}
.tag-filter-label {
font-size: 0.85rem;
color: var(--ink-light);
}
.tag-chip {
display: inline-block;
padding: 4px 10px;
background: var(--surface);
border: 1px solid var(--border);
border-radius: 4px;
font-size: 0.8rem;
color: var(--ink-light);
}
.tag-chip:hover { border-color: var(--accent); color: var(--accent); text-decoration: none; }
.tag-chip.active { background: var(--accent); color: #fff; border-color: var(--accent); }
/* ── Search Meta & Sort ─────────────────────────────────────────── */
.search-meta {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 16px;
font-size: 0.9rem;
color: var(--ink-light);
}
.sort-toggle a {
color: var(--ink-light);
font-size: 0.85rem;
}
.sort-toggle a.active { color: var(--accent); font-weight: 600; }
.sort-toggle a:hover { color: var(--accent); text-decoration: none; }
.sort-divider { color: var(--border); margin: 0 4px; }
/* ── Search Highlight ───────────────────────────────────────────── */
mark {
background: #fff3cd;
color: var(--ink);
padding: 1px 2px;
border-radius: 2px;
}
.paper-snippet {
margin-top: 8px;
color: var(--ink-light);
font-size: 0.92rem;
line-height: 1.6;
}
.paper-date {
margin-left: 12px;
font-size: 0.82rem;
color: var(--ink-light);
}
/* ── Pagination ─────────────────────────────────────────────────── */
.pagination {
display: flex;
justify-content: center;
align-items: center;
gap: 16px;
margin-top: 32px;
padding-top: 16px;
}
.page-btn {
padding: 6px 14px;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
font-size: 0.85rem;
color: var(--ink-light);
}
.page-btn:hover { border-color: var(--accent); color: var(--accent); text-decoration: none; }
.page-info {
font-size: 0.85rem;
color: var(--ink-light);
}
/* ── Reading List ───────────────────────────────────────────────── */
.page-heading {
font-family: var(--font-body);
font-size: 1.5rem;
font-weight: 700;
margin-bottom: 20px;
}
.reading-list-filters {
display: flex;
gap: 6px;
flex-wrap: wrap;
margin-bottom: 16px;
}
.filter-chip {
display: inline-block;
padding: 6px 14px;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
font-size: 0.85rem;
color: var(--ink-light);
}
.filter-chip:hover { border-color: var(--accent); color: var(--accent); text-decoration: none; }
.filter-chip.active { background: var(--accent); color: #fff; border-color: var(--accent); }
/* ── Paper Card Footer (enhanced) ──────────────────────────────── */
.paper-footer {
margin-top: 12px;
display: flex;
justify-content: space-between;
align-items: center;
}
.paper-footer-left {
display: flex;
align-items: center;
gap: 8px;
}
.paper-footer-right {
display: flex;
align-items: center;
gap: 10px;
}
/* ── Bookmark Button ────────────────────────────────────────────── */
.btn-bookmark {
background: none;
border: none;
font-size: 1.2rem;
cursor: pointer;
color: var(--ink-light);
padding: 2px 4px;
transition: color 0.2s;
line-height: 1;
}
.btn-bookmark:hover { color: var(--accent); }
.btn-bookmark.active { color: #f0a500; }
/* ── Reading Badge ──────────────────────────────────────────────── */
.reading-badge {
font-size: 0.75rem;
padding: 2px 6px;
border-radius: 3px;
}
.reading-unread { background: #f0f0f0; color: #888; }
.reading-skimmed { background: #e3f2fd; color: #1976d2; }
.reading-read_summary { background: #e8f5e9; color: #388e3c; }
.reading-read_full { background: #e8f5e9; color: #2e7d32; font-weight: 500; }
/* ── Responsive ─────────────────────────────────────────────────── */ /* ── Responsive ─────────────────────────────────────────────────── */
@media (max-width: 640px) { @media (max-width: 640px) {
.container { padding: 16px; } .container { padding: 16px; }
.nav-bar { padding: 10px 16px; } .nav-bar { padding: 10px 16px; }
.nav-search-input { width: 120px; }
.date-nav { gap: 8px; } .date-nav { gap: 8px; }
.date-title { font-size: 1.2rem; } .date-title { font-size: 1.2rem; }
.paper-card { padding: 14px 16px; } .paper-card { padding: 14px 16px; }
.detail-title { font-size: 1.3rem; } .detail-title { font-size: 1.3rem; }
.detail-meta { flex-direction: column; gap: 4px; } .detail-meta { flex-direction: column; gap: 4px; }
.search-form { flex-direction: column; }
.reading-list-filters { gap: 4px; }
.filter-chip { padding: 4px 10px; font-size: 0.8rem; }
} }
+18 -1
View File
@@ -1 +1,18 @@
/* app.js — 基础前端交互HTMX 后续增强) */ /* app.js — 基础前端交互 */
// Ctrl+K 或 / 聚焦搜索框
document.addEventListener("keydown", function (e) {
var input = document.querySelector(".nav-search-input");
if (!input) return;
// 忽略在输入框内的按键
if (e.target.tagName === "INPUT" || e.target.tagName === "TEXTAREA") return;
if ((e.ctrlKey || e.metaKey) && e.key === "k") {
e.preventDefault();
input.focus();
} else if (e.key === "/") {
e.preventDefault();
input.focus();
}
});
+5 -1
View File
@@ -10,8 +10,11 @@
<header class="site-header"> <header class="site-header">
<nav class="nav-bar"> <nav class="nav-bar">
<a href="/" class="nav-brand">📚 HF Daily Papers</a> <a href="/" class="nav-brand">📚 HF Daily Papers</a>
<form class="nav-search" action="/search" method="get">
<input type="text" name="q" placeholder="搜索..." class="nav-search-input">
</form>
<div class="nav-links"> <div class="nav-links">
<a href="/day/{{ today }}">今日</a> <a href="/day/{{ today if today else '' }}">今日</a>
<a href="/search">搜索</a> <a href="/search">搜索</a>
<a href="/reading-list">阅读列表</a> <a href="/reading-list">阅读列表</a>
</div> </div>
@@ -26,6 +29,7 @@
<p>HF Daily Papers — 中文论文导览站 · 数据来源于 <a href="https://huggingface.co/papers" target="_blank">HuggingFace</a></p> <p>HF Daily Papers — 中文论文导览站 · 数据来源于 <a href="https://huggingface.co/papers" target="_blank">HuggingFace</a></p>
</footer> </footer>
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<script src="/static/js/app.js"></script> <script src="/static/js/app.js"></script>
{% block scripts %}{% endblock %} {% block scripts %}{% endblock %}
</body> </body>
+32 -11
View File
@@ -28,17 +28,38 @@
</div> </div>
<div class="paper-footer"> <div class="paper-footer">
<span class="summary-badge summary-{{ paper.summary_status.status if paper.summary_status else 'none' }}"> <div class="paper-footer-left">
{% if not paper.summary_status or paper.summary_status.status == 'pending' %} <span class="summary-badge summary-{{ paper.summary_status.status if paper.summary_status else 'none' }}">
未总结 {% if not paper.summary_status or paper.summary_status.status == 'pending' %}
{% elif paper.summary_status.status == 'processing' %} 未总结
🔄 总结中 {% elif paper.summary_status.status == 'processing' %}
{% elif paper.summary_status.status == 'failed' or paper.summary_status.status == 'permanent_failure' %} 🔄 总结中
❌ 总结失败 {% elif paper.summary_status.status == 'failed' or paper.summary_status.status == 'permanent_failure' %}
{% elif paper.summary_status.status == 'done' %} ❌ 总结失败
✅ 已总结 {% elif paper.summary_status.status == 'done' %}
✅ 已总结
{% endif %}
</span>
{% if paper.reading_status %}
<span class="reading-badge reading-{{ paper.reading_status.status }}">
{% if paper.reading_status.status == 'unread' %}未读
{% elif paper.reading_status.status == 'skimmed' %}已浏览
{% elif paper.reading_status.status == 'read_summary' %}已读摘要
{% elif paper.reading_status.status == 'read_full' %}已读原文
{% endif %}
</span>
{% endif %} {% endif %}
</span> </div>
<a href="/paper/{{ paper.arxiv_id }}" class="btn-detail">详情 →</a> <div class="paper-footer-right">
<button class="btn-bookmark {% if paper.bookmark %}active{% endif %}"
hx-post="/api/bookmark/{{ paper.arxiv_id }}"
hx-target="#user-data-{{ paper.arxiv_id }}"
hx-swap="outerHTML">
{% if paper.bookmark %}★{% else %}☆{% endif %}
</button>
<a href="/paper/{{ paper.arxiv_id }}" class="btn-detail">详情 →</a>
</div>
</div> </div>
{# HTMX 刷新锚点 — button swap 替换此 div #}
<span id="user-data-{{ paper.arxiv_id }}"></span>
</article> </article>
+51
View File
@@ -0,0 +1,51 @@
{% extends "base.html" %}
{% block title %}{{ page_title }} — HF Daily Papers{% endblock %}
{% block content %}
<section class="reading-list-page">
<h1 class="page-heading">📖 阅读列表</h1>
{# 筛选标签栏 #}
<div class="reading-list-filters">
<a href="/reading-list"
class="filter-chip {% if current_filter == 'all' %}active{% endif %}">全部收藏</a>
<a href="/reading-list?filter=unread"
class="filter-chip {% if current_filter == 'unread' %}active{% endif %}">未读</a>
<a href="/reading-list?filter=skimmed"
class="filter-chip {% if current_filter == 'skimmed' %}active{% endif %}">已浏览</a>
<a href="/reading-list?filter=read_summary"
class="filter-chip {% if current_filter == 'read_summary' %}active{% endif %}">已读摘要</a>
<a href="/reading-list?filter=read_full"
class="filter-chip {% if current_filter == 'read_full' %}active{% endif %}">已读原文</a>
<a href="/reading-list?filter=has_note"
class="filter-chip {% if current_filter == 'has_note' %}active{% endif %}">有笔记</a>
</div>
{# 标签筛选 #}
{% if all_tags %}
<div class="tag-filter">
<span class="tag-filter-label">标签:</span>
<a href="/reading-list?filter={{ current_filter }}"
class="tag-chip {% if not current_tag %}active{% endif %}">全部</a>
{% for t in all_tags %}
<a href="/reading-list?filter={{ current_filter }}&tag={{ t }}"
class="tag-chip {% if t == current_tag %}active{% endif %}">{{ t }}</a>
{% endfor %}
</div>
{% endif %}
{% if papers %}
<div class="paper-list">
{% for paper in papers %}
{% include "partials/paper_card.html" %}
{% endfor %}
</div>
{% else %}
<div class="empty-state">
<p>阅读列表为空</p>
<p class="hint">浏览论文时点击 ☆ 收藏,或设置阅读状态</p>
</div>
{% endif %}
</section>
{% endblock %}
+123
View File
@@ -0,0 +1,123 @@
{% extends "base.html" %}
{% block title %}{{ page_title }} — HF Daily Papers{% endblock %}
{% block content %}
<section class="search-page">
{# 搜索表单 #}
<form class="search-form" method="get" action="/search">
<input type="text" name="q" value="{{ query }}" placeholder="搜索标题、摘要、作者、标签..."
class="search-input" autofocus>
{% if tag %}
<input type="hidden" name="tag" value="{{ tag }}">
{% endif %}
<button type="submit" class="search-btn">搜索</button>
</form>
{# 标签筛选 #}
{% if all_tags %}
<div class="tag-filter">
<span class="tag-filter-label">标签:</span>
<a href="/search{% if query %}?q={{ query }}{% endif %}"
class="tag-chip {% if not tag %}active{% endif %}">全部</a>
{% for t in all_tags %}
<a href="/search?q={{ query }}&tag={{ t }}"
class="tag-chip {% if t == tag %}active{% endif %}">{{ t }}</a>
{% endfor %}
</div>
{% endif %}
{% if query or tag %}
{# 搜索结果元信息 #}
<div class="search-meta">
<span>找到 {{ total }} 条结果</span>
<div class="sort-toggle">
<a href="/search?q={{ query }}&tag={{ tag }}&sort=relevance"
class="{% if sort == 'relevance' %}active{% endif %}">相关性</a>
<span class="sort-divider">|</span>
<a href="/search?q={{ query }}&tag={{ tag }}&sort=date"
class="{% if sort == 'date' %}active{% endif %}">日期</a>
</div>
</div>
{% if results %}
<div class="paper-list">
{% for paper in results %}
<article class="paper-card search-result" data-arxiv="{{ paper.arxiv_id }}">
<div class="paper-card-header">
<h2 class="paper-title">
<a href="/paper/{{ paper.arxiv_id }}">
{% set snippet = snippets.get(paper.id, {}) %}
{% if snippet and snippet.title_zh %}
{{ snippet.title_zh | safe }}
{% elif paper.title_zh %}
{{ paper.title_zh }}
{% else %}
{{ paper.title_en }}
{% endif %}
</a>
</h2>
<span class="paper-upvotes">👍 {{ paper.upvotes }}</span>
</div>
{% if snippet and snippet.abstract %}
<p class="paper-snippet">{{ snippet.abstract | safe }}</p>
{% elif paper.summary and paper.summary.one_line %}
<p class="paper-one-line">{{ paper.summary.one_line }}</p>
{% elif paper.abstract %}
<p class="paper-abstract-preview">{{ paper.abstract[:200] }}{% if paper.abstract|length > 200 %}…{% endif %}</p>
{% endif %}
<div class="paper-meta">
<span class="paper-authors">
{{ paper.authors|map(attribute='name')|join(', ')|truncate(80) }}
</span>
<span class="paper-date">{{ paper.paper_date }}</span>
</div>
<div class="paper-tags">
{% for t in paper.tags[:5] %}
<span class="tag">{{ t.tag }}</span>
{% endfor %}
</div>
<div class="paper-footer">
<span class="summary-badge summary-{{ paper.summary_status.status if paper.summary_status else 'none' }}">
{% if not paper.summary_status or paper.summary_status.status == 'pending' %}
未总结
{% elif paper.summary_status.status == 'processing' %}
🔄 总结中
{% elif paper.summary_status.status in ('failed', 'permanent_failure') %}
❌ 总结失败
{% elif paper.summary_status.status == 'done' %}
✅ 已总结
{% endif %}
</span>
<a href="/paper/{{ paper.arxiv_id }}" class="btn-detail">详情 →</a>
</div>
</article>
{% endfor %}
</div>
{# 分页 #}
{% if total_pages > 1 %}
<nav class="pagination">
{% if page > 1 %}
<a href="/search?q={{ query }}&tag={{ tag }}&sort={{ sort }}&page={{ page - 1 }}" class="page-btn">← 上一页</a>
{% endif %}
<span class="page-info">{{ page }} / {{ total_pages }}</span>
{% if page < total_pages %}
<a href="/search?q={{ query }}&tag={{ tag }}&sort={{ sort }}&page={{ page + 1 }}" class="page-btn">下一页 →</a>
{% endif %}
</nav>
{% endif %}
{% else %}
<div class="empty-state">
<p>没有找到匹配的论文</p>
<p class="hint">试试其他关键词或标签</p>
</div>
{% endif %}
{% endif %}
</section>
{% endblock %}
+2
View File
@@ -11,6 +11,7 @@ import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlalchemy import create_engine, event from sqlalchemy import create_engine, event
from sqlalchemy.orm import DeclarativeBase, sessionmaker from sqlalchemy.orm import DeclarativeBase, sessionmaker
from sqlalchemy.pool import StaticPool
from app.database import get_db from app.database import get_db
from app.main import create_app from app.main import create_app
@@ -43,6 +44,7 @@ def db_engine():
engine = create_engine( engine = create_engine(
"sqlite:///:memory:", "sqlite:///:memory:",
connect_args={"check_same_thread": False}, connect_args={"check_same_thread": False},
poolclass=StaticPool,
) )
@event.listens_for(engine, "connect") @event.listens_for(engine, "connect")
+267
View File
@@ -0,0 +1,267 @@
"""搜索、阅读列表、RSS Feed 测试。"""
from __future__ import annotations
import pytest
from datetime import date, datetime, timezone
# ═══════════════════════════════════════════════════════════════════════
# 搜索服务单元测试
# ═══════════════════════════════════════════════════════════════════════
class TestSearchService:
"""app/services/searcher.py 单元测试。"""
def test_search_by_title(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="Test Paper")
assert result["total"] == 1
assert result["results"][0].arxiv_id == "2401.12345"
def test_search_by_abstract(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="test abstract")
assert result["total"] == 1
def test_search_by_author(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="Alice")
assert result["total"] == 1
def test_search_by_tag_in_fts(self, db_session, sample_paper):
from app.services.searcher import search_papers
# FTS5 索引中包含 tags 列,可以搜到
result = search_papers(db_session, query="NLP")
assert result["total"] == 1
def test_search_no_results(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="quantum entanglement")
assert result["total"] == 0
assert result["results"] == []
def test_search_empty_query_returns_empty(self, db_session):
from app.services.searcher import search_papers
result = search_papers(db_session, query="")
assert result["total"] == 0
assert result["results"] == []
def test_search_special_characters_sanitized(self, db_session, sample_paper):
from app.services.searcher import search_papers
# 特殊字符被清除后,剩下 "Test" 仍然能搜到
result = search_papers(db_session, query='Test "Paper" {test}')
assert result["total"] >= 1
def test_search_with_tag_filter(self, db_session, sample_paper):
from app.services.searcher import search_papers
# 关键词 + 标签筛选
result = search_papers(db_session, query="Paper", tag="NLP")
assert result["total"] == 1
# 标签不匹配 → 0
result2 = search_papers(db_session, query="Paper", tag="nonexistent")
assert result2["total"] == 0
def test_search_tag_only_no_query(self, db_session, sample_paper):
from app.services.searcher import search_papers
# 只有标签,无关键词
result = search_papers(db_session, tag="NLP")
assert result["total"] == 1
assert result["results"][0].arxiv_id == "2401.12345"
def test_search_pagination(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="Test", page=2, page_size=10)
assert result["page"] == 2
assert result["total_pages"] == 1 # 只有 1 条结果,1 页
def test_search_returns_snippets(self, db_session, sample_paper):
from app.services.searcher import search_papers
result = search_papers(db_session, query="test abstract")
assert result["total"] == 1
paper_id = result["results"][0].id
assert paper_id in result["snippets"]
snippet = result["snippets"][paper_id]
assert "abstract" in snippet
def test_get_all_tags(self, db_session, sample_paper):
from app.services.searcher import get_all_tags
tags = get_all_tags(db_session)
assert "NLP" in tags
assert "LLM" in tags
# ═══════════════════════════════════════════════════════════════════════
# 搜索路由 HTTP 测试
# ═══════════════════════════════════════════════════════════════════════
class TestSearchRoutes:
"""搜索页面和 JSON API 路由测试。"""
def test_search_page_renders(self, client):
"""GET /search 返回 200。"""
resp = client.get("/search")
assert resp.status_code == 200
assert "搜索" in resp.text
def test_search_page_with_query(self, client, sample_paper):
"""GET /search?q=Test 返回搜索结果。"""
resp = client.get("/search?q=Test")
assert resp.status_code == 200
assert "2401.12345" in resp.text
def test_search_page_with_tag(self, client, sample_paper):
"""GET /search?tag=NLP 返回标签筛选结果。"""
resp = client.get("/search?tag=NLP")
assert resp.status_code == 200
assert "2401.12345" in resp.text
def test_search_api_json(self, client, sample_paper):
"""GET /api/search?q=Test 返回 JSON。"""
resp = client.get("/api/search?q=Test")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
assert any(p["arxiv_id"] == "2401.12345" for p in data["results"])
def test_search_api_with_tag(self, client, sample_paper):
"""GET /api/search?q=Test&tag=NLP 返回筛选结果。"""
resp = client.get("/api/search?q=Test&tag=NLP")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
def test_search_api_empty(self, client, sample_paper):
"""GET /api/search?q=nonexistent 返回空结果。"""
resp = client.get("/api/search?q=nonexistent")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
def test_search_api_sort_by_date(self, client, sample_paper):
"""GET /api/search?q=Test&sort=date 按日期排序。"""
resp = client.get("/api/search?q=Test&sort=date")
assert resp.status_code == 200
data = resp.json()
assert data["total"] >= 1
# ═══════════════════════════════════════════════════════════════════════
# 阅读列表路由测试
# ═══════════════════════════════════════════════════════════════════════
class TestReadingListRoute:
"""阅读列表页面测试。"""
def test_reading_list_empty(self, client):
"""无收藏时显示空状态。"""
resp = client.get("/reading-list")
assert resp.status_code == 200
assert "阅读列表" in resp.text
def test_reading_list_with_bookmark(self, client, sample_paper):
"""有收藏时显示论文。"""
# 先收藏
client.post("/api/bookmark/2401.12345")
resp = client.get("/reading-list")
assert resp.status_code == 200
assert "2401.12345" in resp.text
def test_reading_list_filter_by_status(self, client, sample_paper):
"""按阅读状态筛选。"""
import json
# 设置阅读状态
client.post(
"/api/reading-status/2401.12345",
json={"status": "read_summary"},
)
# 筛选 read_summary
resp = client.get("/reading-list?filter=read_summary")
assert resp.status_code == 200
assert "2401.12345" in resp.text
# 筛选 unread(不应出现,因为状态是 read_summary
resp2 = client.get("/reading-list?filter=unread")
assert resp2.status_code == 200
assert "2401.12345" not in resp2.text
def test_reading_list_has_note_filter(self, client, sample_paper):
"""筛选有笔记的论文。"""
import json
# 写笔记
client.post(
"/api/note/2401.12345",
json={"content": "这是一条笔记"},
)
resp = client.get("/reading-list?filter=has_note")
assert resp.status_code == 200
assert "2401.12345" in resp.text
# ═══════════════════════════════════════════════════════════════════════
# RSS Feed 测试
# ═══════════════════════════════════════════════════════════════════════
class TestRssFeed:
"""RSS Feed 路由测试。"""
@pytest.fixture(autouse=True)
def _recent_paper(self, db_session, sample_paper):
"""将 sample_paper 的 paper_date 设为今天,确保在 RSS 7 天窗口内。"""
sample_paper.paper_date = date.today()
db_session.commit()
def test_rss_xml_structure(self, client, sample_paper):
"""GET /rss.xml 返回有效 XML。"""
resp = client.get("/rss.xml")
assert resp.status_code == 200
assert "application/xml" in resp.headers["content-type"]
assert "<?xml" in resp.text
assert "<rss" in resp.text
assert "<channel>" in resp.text
assert "2401.12345" in resp.text
def test_rss_has_paper_item(self, client, sample_paper):
"""RSS 包含论文条目。"""
resp = client.get("/rss.xml")
assert "<item>" in resp.text
assert "<title>" in resp.text
assert "/paper/2401.12345" in resp.text
def test_rss_with_tag_filter(self, client, sample_paper):
"""GET /rss.xml?tag=NLP 按标签筛选。"""
resp = client.get("/rss.xml?tag=NLP")
assert resp.status_code == 200
assert "2401.12345" in resp.text
resp2 = client.get("/rss.xml?tag=nonexistent")
assert resp2.status_code == 200
assert "2401.12345" not in resp2.text
def test_rss_uses_chinese_title(self, client, db_session, sample_paper):
"""RSS 使用中文标题(如果有的话)。"""
sample_paper.title_zh = "测试中文标题"
db_session.commit()
resp = client.get("/rss.xml")
assert resp.status_code == 200
assert "测试中文标题" in resp.text
+214
View File
@@ -0,0 +1,214 @@
"""用户数据服务 + 路由测试 — 收藏、阅读状态、笔记。"""
from __future__ import annotations
import json
from datetime import datetime, timezone
# ═══════════════════════════════════════════════════════════════════════
# 收藏服务测试
# ═══════════════════════════════════════════════════════════════════════
class TestBookmarkService:
def test_toggle_bookmark_add(self, db_session, sample_paper):
from app.services.user_data import toggle_bookmark
result = toggle_bookmark(db_session, "2401.12345")
assert result["bookmarked"] is True
assert result["arxiv_id"] == "2401.12345"
def test_toggle_bookmark_remove(self, db_session, sample_paper):
from app.services.user_data import toggle_bookmark
toggle_bookmark(db_session, "2401.12345") # 添加
result = toggle_bookmark(db_session, "2401.12345") # 移除
assert result["bookmarked"] is False
def test_toggle_bookmark_not_found(self, db_session):
from app.services.user_data import toggle_bookmark
result = toggle_bookmark(db_session, "nonexistent")
assert "error" in result
assert result["error"] == "not_found"
# ═══════════════════════════════════════════════════════════════════════
# 阅读状态服务测试
# ═══════════════════════════════════════════════════════════════════════
class TestReadingStatusService:
def test_set_reading_status(self, db_session, sample_paper):
from app.services.user_data import set_reading_status
result = set_reading_status(db_session, "2401.12345", "read_summary")
assert result["status"] == "read_summary"
assert result["arxiv_id"] == "2401.12345"
def test_set_reading_status_invalid(self, db_session, sample_paper):
from app.services.user_data import set_reading_status
result = set_reading_status(db_session, "2401.12345", "invalid_status")
assert "error" in result
assert result["error"] == "invalid_status"
def test_update_existing_status(self, db_session, sample_paper):
from app.services.user_data import set_reading_status
set_reading_status(db_session, "2401.12345", "skimmed")
result = set_reading_status(db_session, "2401.12345", "read_full")
assert result["status"] == "read_full"
def test_set_reading_status_not_found(self, db_session):
from app.services.user_data import set_reading_status
result = set_reading_status(db_session, "nonexistent", "unread")
assert "error" in result
assert result["error"] == "not_found"
def test_all_valid_statuses(self, db_session, sample_paper):
from app.services.user_data import set_reading_status
for status in ("unread", "skimmed", "read_summary", "read_full"):
result = set_reading_status(db_session, "2401.12345", status)
assert result["status"] == status
# ═══════════════════════════════════════════════════════════════════════
# 笔记服务测试
# ═══════════════════════════════════════════════════════════════════════
class TestNoteService:
def test_save_and_get_note(self, db_session, sample_paper):
from app.services.user_data import get_note, save_note
save_note(db_session, "2401.12345", "这是一条测试笔记")
result = get_note(db_session, "2401.12345")
assert result["content"] == "这是一条测试笔记"
assert result["arxiv_id"] == "2401.12345"
assert result["updated_at"] is not None
def test_update_note(self, db_session, sample_paper):
from app.services.user_data import get_note, save_note
save_note(db_session, "2401.12345", "旧笔记")
save_note(db_session, "2401.12345", "新笔记")
result = get_note(db_session, "2401.12345")
assert result["content"] == "新笔记"
def test_get_note_empty(self, db_session, sample_paper):
from app.services.user_data import get_note
result = get_note(db_session, "2401.12345")
assert result["content"] == ""
assert result["updated_at"] is None
def test_get_note_paper_not_found(self, db_session):
from app.services.user_data import get_note
result = get_note(db_session, "nonexistent")
assert result is None
def test_save_note_paper_not_found(self, db_session):
from app.services.user_data import save_note
result = save_note(db_session, "nonexistent", "内容")
assert "error" in result
assert result["error"] == "not_found"
# ═══════════════════════════════════════════════════════════════════════
# 用户数据路由 HTTP 测试
# ═══════════════════════════════════════════════════════════════════════
class TestUserDataRoutes:
"""HTTP 级别的用户数据 API 测试。"""
def test_bookmark_toggle_api(self, client, sample_paper):
"""POST /api/bookmark/{arxiv_id} 切换收藏。"""
resp = client.post("/api/bookmark/2401.12345")
assert resp.status_code == 200
data = resp.json()
assert data["bookmarked"] is True
# 再次切换 → 取消
resp2 = client.post("/api/bookmark/2401.12345")
assert resp2.status_code == 200
data2 = resp2.json()
assert data2["bookmarked"] is False
def test_bookmark_htmx_returns_html(self, client, sample_paper):
"""HTMX 请求返回 HTML 片段。"""
headers = {"HX-Request": "true"}
resp = client.post("/api/bookmark/2401.12345", headers=headers)
assert resp.status_code == 200
assert "btn-bookmark" in resp.text
assert "" in resp.text
def test_bookmark_not_found(self, client):
"""收藏不存在的论文返回 404。"""
resp = client.post("/api/bookmark/nonexistent")
assert resp.status_code == 404
def test_reading_status_api(self, client, sample_paper):
"""POST /api/reading-status/{arxiv_id} 更新状态。"""
resp = client.post(
"/api/reading-status/2401.12345",
json={"status": "read_summary"},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "read_summary"
def test_reading_status_invalid(self, client, sample_paper):
"""无效状态返回 422。"""
resp = client.post(
"/api/reading-status/2401.12345",
json={"status": "invalid"},
)
assert resp.status_code == 422
def test_reading_status_not_found(self, client):
"""不存在的论文返回 404。"""
resp = client.post(
"/api/reading-status/nonexistent",
json={"status": "unread"},
)
assert resp.status_code == 404
def test_note_get_api(self, client, sample_paper):
"""GET /api/note/{arxiv_id} 返回笔记。"""
resp = client.get("/api/note/2401.12345")
assert resp.status_code == 200
data = resp.json()
assert data["content"] == ""
def test_note_save_api(self, client, sample_paper):
"""POST /api/note/{arxiv_id} 保存笔记。"""
resp = client.post(
"/api/note/2401.12345",
json={"content": "Markdown **笔记**"},
)
assert resp.status_code == 200
data = resp.json()
assert data["content"] == "Markdown **笔记**"
assert data["updated_at"] is not None
# 读取确认
resp2 = client.get("/api/note/2401.12345")
assert resp2.json()["content"] == "Markdown **笔记**"
def test_note_not_found(self, client):
"""不存在的论文返回 404。"""
resp = client.get("/api/note/nonexistent")
assert resp.status_code == 404
resp2 = client.post(
"/api/note/nonexistent",
json={"content": "test"},
)
assert resp2.status_code == 404